[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-29 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r7684
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPolicy {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @param numPeersToReplicateTo Number of peers we need to replicate to
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority.
+   * This returns a list of size at most `numPeersToReplicateTo`.
+   */
+  def prioritize(
+  blockManagerId: BlockManagerId,
+  peers: Seq[BlockManagerId],
+  peersReplicatedTo: mutable.HashSet[BlockManagerId],
+  blockId: BlockId,
+  numPeersToReplicateTo: Int): List[BlockManagerId]
+}
+
+@DeveloperApi
+class RandomBlockReplicationPolicy
+  extends BlockReplicationPolicy
+  with Logging {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block. This is a 
basic implementation,
+   * that just makes sure we put blocks on different hosts, if possible
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  override def prioritize(
+  blockManagerId: BlockManagerId,
+  peers: Seq[BlockManagerId],
+  peersReplicatedTo: mutable.HashSet[BlockManagerId],
+  blockId: BlockId,
+  numReplicas: Int): List[BlockManagerId] = {
+val random = new Random(blockId.hashCode)
+logDebug(s"Input peers : ${peers.mkString(", ")}")
+val prioritizedPeers = if (peers.size > numReplicas) {
+  getSampleIds(peers.size, numReplicas, random).map(peers(_))
+} else {
+  if (peers.size < numReplicas) {
+logWarning(s"Expecting ${numReplicas} replicas with only 
${peers.size} peer/s.")
+  }
+  random.shuffle(peers).toList
+}
+logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
+prioritizedPeers
+  }
+
+  /**
+   * Uses sampling algorithm by Robert Floyd. Finds a random sample in 
O(n) while
+   * minimizing space usage
+   * [[http://math.stackexchange.com/questions/178690/
+   * 
whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin]]
+   *
+   * @param n total number of indices
+   * @param m number of samples needed
+   * @param r random number generator
+   * @return list of m random unique indices
+   */
+  private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
+val 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170909
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPolicy {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @param numPeersToReplicateTo Number of peers we need to replicate to
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority.
+   * This returns a list of size at most `numPeersToReplicateTo`.
+   */
+  def prioritize(
+  blockManagerId: BlockManagerId,
+  peers: Seq[BlockManagerId],
+  peersReplicatedTo: mutable.HashSet[BlockManagerId],
+  blockId: BlockId,
+  numPeersToReplicateTo: Int): List[BlockManagerId]
+}
+
+@DeveloperApi
+class RandomBlockReplicationPolicy
+  extends BlockReplicationPolicy
+  with Logging {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block. This is a 
basic implementation,
+   * that just makes sure we put blocks on different hosts, if possible
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  override def prioritize(
+  blockManagerId: BlockManagerId,
+  peers: Seq[BlockManagerId],
+  peersReplicatedTo: mutable.HashSet[BlockManagerId],
+  blockId: BlockId,
+  numReplicas: Int): List[BlockManagerId] = {
+val random = new Random(blockId.hashCode)
+logDebug(s"Input peers : ${peers.mkString(", ")}")
+val prioritizedPeers = if (peers.size > numReplicas) {
+  getSampleIds(peers.size, numReplicas, random).map(peers(_))
+} else {
+  if (peers.size < numReplicas) {
+logWarning(s"Expecting ${numReplicas} replicas with only 
${peers.size} peer/s.")
+  }
+  random.shuffle(peers).toList
+}
+logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
+prioritizedPeers
+  }
+
+  /**
+   * Uses sampling algorithm by Robert Floyd. Finds a random sample in 
O(n) while
+   * minimizing space usage
+   * [[http://math.stackexchange.com/questions/178690/
+   * 
whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin]]
+   *
+   * @param n total number of indices
+   * @param m number of samples needed
+   * @param r random number generator
+   * @return list of m random unique indices
+   */
+  private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
+val indices = 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170841
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPolicy {
--- End diff --

can we create a BlockReplicationPolicySuite and add unit tests for 
RandomBlockReplicationPolicy? You'd want to verify it is returning you the 
right number of peers and make sure they don't duplicate each other ...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170799
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   * This information can be used when choosing executors for 
block replication
+   * to discern executors from a different rack than a candidate 
executor, for example.
+   *
+   * An implementation can choose to use empty strings or None in 
case topology info
+   * is not available. This would imply that all such executors 
belong to the same rack.
+   */
+  def getTopologyForHost(hostname: String): Option[String]
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
+  override def getTopologyForHost(hostname: String): Option[String] = {
+logDebug(s"Got a request for $hostname")
+Some("DefaultRack")
+  }
+}
+
+/**
+ * A simple file based topology mapper. This expects topology information 
provided as a
+ * [[java.util.Properties]] file. The name of the file is obtained from 
SparkConf property
+ * `spark.replication.topologyawareness.topologyFile`. To use this 
topology mapper, set the
+ * `spark.replication.topologyawareness.topologyMapper` property to
+ * [[org.apache.spark.storage.FileBasedTopologyMapper]]
+ * @param conf SparkConf object
+ */
+@DeveloperApi
+class FileBasedTopologyMapper(conf: SparkConf) extends 
TopologyMapper(conf) with Logging {
--- End diff --

can we add a unit test for FileBasedTopologyMapper itself?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170783
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala 
---
@@ -37,6 +35,8 @@ import org.apache.spark.serializer.{KryoSerializer, 
SerializerManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.StorageLevel._
 
+import scala.collection.mutable
--- End diff --

i think this will fail the style checker for import order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170745
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -55,10 +55,22 @@ class BlockManagerMasterEndpoint(
   private val askThreadPool = 
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
   private implicit val askExecutionContext = 
ExecutionContext.fromExecutorService(askThreadPool)
 
+  private val topologyMapper = {
+val topologyMapperClassName = conf.get(
+  "spark.replication.topologyawareness.topologyMapper",
--- End diff --

```
spark.storage.replication.topologyMapper
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170720
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   * This information can be used when choosing executors for 
block replication
+   * to discern executors from a different rack than a candidate 
executor, for example.
+   *
+   * An implementation can choose to use empty strings or None in 
case topology info
+   * is not available. This would imply that all such executors 
belong to the same rack.
+   */
+  def getTopologyForHost(hostname: String): Option[String]
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
--- End diff --

add a simple classdoc here - something like "A TopologyMapper that assumes 
all nodes are in the same rack" 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170655
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -160,8 +164,25 @@ private[spark] class BlockManager(
 blockTransferService.init(this)
 shuffleClient.init(appId)
 
-blockManagerId = BlockManagerId(
-  executorId, blockTransferService.hostName, blockTransferService.port)
+blockReplicationPolicy = {
+  val priorityClass = conf.get(
+"spark.replication.topologyawareness.prioritizer",
+"org.apache.spark.storage.RandomBlockReplicationPolicy")
--- End diff --

classOf[RandomBlockReplicationPolicy].getName


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170624
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   * This information can be used when choosing executors for 
block replication
+   * to discern executors from a different rack than a candidate 
executor, for example.
+   *
+   * An implementation can choose to use empty strings or None in 
case topology info
+   * is not available. This would imply that all such executors 
belong to the same rack.
+   */
+  def getTopologyForHost(hostname: String): Option[String]
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
+  override def getTopologyForHost(hostname: String): Option[String] = {
+logDebug(s"Got a request for $hostname")
+Some("DefaultRack")
--- End diff --

should this return None instead?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170605
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   * This information can be used when choosing executors for 
block replication
+   * to discern executors from a different rack than a candidate 
executor, for example.
+   *
+   * An implementation can choose to use empty strings or None in 
case topology info
+   * is not available. This would imply that all such executors 
belong to the same rack.
+   */
+  def getTopologyForHost(hostname: String): Option[String]
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
+  override def getTopologyForHost(hostname: String): Option[String] = {
+logDebug(s"Got a request for $hostname")
+Some("DefaultRack")
+  }
+}
+
+/**
+ * A simple file based topology mapper. This expects topology information 
provided as a
+ * [[java.util.Properties]] file. The name of the file is obtained from 
SparkConf property
+ * `spark.replication.topologyawareness.topologyFile`. To use this 
topology mapper, set the
+ * `spark.replication.topologyawareness.topologyMapper` property to
+ * [[org.apache.spark.storage.FileBasedTopologyMapper]]
+ * @param conf SparkConf object
+ */
+@DeveloperApi
+class FileBasedTopologyMapper(conf: SparkConf) extends 
TopologyMapper(conf) with Logging {
+  val topologyFile = 
conf.getOption("spark.replication.topologyawareness.topologyfile")
+  require(topologyFile.isDefined, "Please provide topology file for 
FileBasedTopologyMapper.")
--- End diff --

"Please specify topology file via spark.storage.replication.topologyFile 
for FileBasedTopologyMapper."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170578
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   * This information can be used when choosing executors for 
block replication
+   * to discern executors from a different rack than a candidate 
executor, for example.
+   *
+   * An implementation can choose to use empty strings or None in 
case topology info
+   * is not available. This would imply that all such executors 
belong to the same rack.
+   */
+  def getTopologyForHost(hostname: String): Option[String]
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
+  override def getTopologyForHost(hostname: String): Option[String] = {
+logDebug(s"Got a request for $hostname")
+Some("DefaultRack")
+  }
+}
+
+/**
+ * A simple file based topology mapper. This expects topology information 
provided as a
+ * [[java.util.Properties]] file. The name of the file is obtained from 
SparkConf property
+ * `spark.replication.topologyawareness.topologyFile`. To use this 
topology mapper, set the
+ * `spark.replication.topologyawareness.topologyMapper` property to
+ * [[org.apache.spark.storage.FileBasedTopologyMapper]]
+ * @param conf SparkConf object
+ */
+@DeveloperApi
+class FileBasedTopologyMapper(conf: SparkConf) extends 
TopologyMapper(conf) with Logging {
+  val topologyFile = 
conf.getOption("spark.replication.topologyawareness.topologyfile")
--- End diff --

I was looking at existing configs - can we change the config to
```
spark.storage.replication.topologyFile
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170505
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala 
---
@@ -347,6 +347,47 @@ class BlockManagerReplicationSuite extends 
SparkFunSuite
   }
 
   /**
+   * Test if we get the required number of peers when using random 
sampling from
+   * RandomBlockReplicationPolicy
+   */
+  test(s"block replication - random block replication policy") {
+val numBlockManagers = 10
+val storeSize = 1000
+val blockManagers = (1 to numBlockManagers).map { i =>
+  BlockManagerId(s"store-$i", "localhost", 1000 + i, None)
+}
+val candidateBlockManager = BlockManagerId("test-store", "localhost", 
1000, None)
+val replicationPolicy = new RandomBlockReplicationPolicy
+val blockId = "test-block"
+
+(1 to 10).foreach {numReplicas =>
+  logInfo(s"Num replicas : $numReplicas")
--- End diff --

are these logging information useful in tests?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r76170404
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPolicy {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @param numPeersToReplicateTo Number of peers we need to replicate to
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority.
+   * This returns a list of size at most `numPeersToReplicateTo`.
+   */
+  def prioritize(
+  blockManagerId: BlockManagerId,
+  peers: Seq[BlockManagerId],
+  peersReplicatedTo: mutable.HashSet[BlockManagerId],
+  blockId: BlockId,
+  numPeersToReplicateTo: Int): List[BlockManagerId]
--- End diff --

numReplicas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75427102
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { // 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426940
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { // 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426951
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { // 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426791
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { // 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426781
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { // 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426753
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { // 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426714
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { // 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426587
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPrioritization {
--- End diff --

can we just name this BlockReplicationPolicy?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426605
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.io._
 import java.nio.ByteBuffer
 
+import scala.annotation.tailrec
--- End diff --

is this used anywhere?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426552
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
--- End diff --

reset the change here - use 4 space indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426567
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
--- End diff --

remove these params unless you really are going to document them.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426531
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
---
@@ -69,24 +72,37 @@ class BlockManagerId private (
 out.writeUTF(executorId_)
 out.writeUTF(host_)
 out.writeInt(port_)
+out.writeBoolean(topologyInfo_.isDefined)
+// we only write topologyInfo if we have it
+topologyInfo.foreach(out.writeUTF(_))
   }
 
   override def readExternal(in: ObjectInput): Unit = 
Utils.tryOrIOException {
 executorId_ = in.readUTF()
 host_ = in.readUTF()
 port_ = in.readInt()
+val isTopologyInfoAvailable = in.readBoolean()
+topologyInfo_ = if (isTopologyInfoAvailable) {
--- End diff --

it might be more clear to do
```
if (isTopologyInfoAvailable) {
  topologyInfo_ = Option(in.readUTF())
}  else {
  topologyInfo_ = None
}
```

or

```
topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426476
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
---
@@ -101,10 +117,18 @@ private[spark] object BlockManagerId {
* @param execId ID of the executor.
* @param host Host name of the block manager.
* @param port Port of the block manager.
+   * @param topologyInfo topology information for the blockmanager, if 
available
+   * This can be network topology information for use 
while choosing peers
+   * while replicating data blocks. More information 
available here:
+   * [[org.apache.spark.storage.TopologyMapper]]
* @return A new [[org.apache.spark.storage.BlockManagerId]].
*/
-  def apply(execId: String, host: String, port: Int): BlockManagerId =
-getCachedBlockManagerId(new BlockManagerId(execId, host, port))
+  def apply(
+execId: String,
--- End diff --

4 space indent here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426446
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -298,7 +310,17 @@ class BlockManagerMasterEndpoint(
 ).map(_.flatten.toSeq)
   }
 
-  private def register(id: BlockManagerId, maxMemSize: Long, 
slaveEndpoint: RpcEndpointRef) {
+  private def register(dummyId: BlockManagerId,
+maxMemSize: Long,
--- End diff --

Can you also add a method doc saying this returns the same id with topology 
information attached?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426435
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -298,7 +310,17 @@ class BlockManagerMasterEndpoint(
 ).map(_.flatten.toSeq)
   }
 
-  private def register(id: BlockManagerId, maxMemSize: Long, 
slaveEndpoint: RpcEndpointRef) {
+  private def register(dummyId: BlockManagerId,
+maxMemSize: Long,
--- End diff --

also instead of dummyId, I'd call it "idWithoutTopologyInfo"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426412
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -298,7 +310,17 @@ class BlockManagerMasterEndpoint(
 ).map(_.flatten.toSeq)
   }
 
-  private def register(id: BlockManagerId, maxMemSize: Long, 
slaveEndpoint: RpcEndpointRef) {
+  private def register(dummyId: BlockManagerId,
+maxMemSize: Long,
--- End diff --

4 space indent, and put all the arguments on its own line, e.g.
```
private def register(
dummyId: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
  ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426383
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
@@ -50,12 +50,20 @@ class BlockManagerMaster(
 logInfo("Removal of executor " + execId + " requested")
   }
 
-  /** Register the BlockManager's id with the driver. */
+  /**
+   * Register the BlockManager's id with the driver. The input 
BlockManagerId does not contain
+   * topology information. This information is obtained from the master 
and we respond with an
+   * updated BlockManagerId fleshed out with this information.
+   */
   def registerBlockManager(
-  blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: 
RpcEndpointRef): Unit = {
+blockManagerId: BlockManagerId,
--- End diff --

indent 4 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426300
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -160,8 +163,25 @@ private[spark] class BlockManager(
 blockTransferService.init(this)
 shuffleClient.init(appId)
 
-blockManagerId = BlockManagerId(
-  executorId, blockTransferService.hostName, blockTransferService.port)
+blockReplicationPrioritizer = {
+  val priorityClass = conf.get(
+"spark.replication.topologyawareness.prioritizer",
+"org.apache.spark.storage.DefaultBlockReplicationPrioritization")
+  val clazz = Utils.classForName(priorityClass)
+  val ret = 
clazz.newInstance.asInstanceOf[BlockReplicationPrioritization]
+  logInfo(s"Using $priorityClass for prioritizing peers")
--- End diff --

```
Using $priorityClass for block replication policy
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426231
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPrioritization {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  def prioritize(
+blockManagerId: BlockManagerId,
+peers: Seq[BlockManagerId],
--- End diff --

also rather than a full prioritization, can we also pass in a number of 
replicas wanted and just return a list there?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426199
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPrioritization {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  def prioritize(
+blockManagerId: BlockManagerId,
+peers: Seq[BlockManagerId],
--- End diff --

is passing in all the peers a performance concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426189
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPrioritization {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  def prioritize(
+blockManagerId: BlockManagerId,
+peers: Seq[BlockManagerId],
+peersReplicatedTo: Set[BlockManagerId],
+blockId: BlockId): Seq[BlockManagerId]
+}
+
+@DeveloperApi
+class DefaultBlockReplicationPrioritization
--- End diff --

instead of Default, I'd call this RandomBlockReplicationPrioritization to 
better reflect what it does.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426147
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks. BlockManager will replicate to each peer returned 
in order until the
+ * desired replication order is reached. If a replication fails, 
prioritize() will be called
+ * again to get a fresh prioritization.
+ */
+@DeveloperApi
+trait BlockReplicationPrioritization {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  def prioritize(
+blockManagerId: BlockManagerId,
+peers: Seq[BlockManagerId],
+peersReplicatedTo: Set[BlockManagerId],
+blockId: BlockId): Seq[BlockManagerId]
+}
+
+@DeveloperApi
+class DefaultBlockReplicationPrioritization
+  extends BlockReplicationPrioritization
+  with Logging {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block. This is a 
basic implementation,
+   * that just makes sure we put blocks on different hosts, if possible
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  override def prioritize(
+blockManagerId: BlockManagerId,
--- End diff --

so the Spark style for indentation is to have 4 spaces for function 
arguments, i.e.
```scala
override def prioritize(
blockManagerId: BlockManagerId,,
peers: Seq[BlockManagerId],
peersReplicatedTo: Set[BlockManagerId],
blockId: BlockId): Seq[BlockManagerId] = {
  val random = new Random(blockId.hashCode)
  ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r75426070
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.{File, FileInputStream}
+import java.util.Properties
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
--- End diff --

can you document what an empty string means?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-12 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74650320
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-12 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74634331
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-12 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74634093
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -138,9 +138,7 @@ class BlockManagerMasterEndpoint(
   }
   }
 
-  private def getTopologyInfoForHost(host: String): String = {
-topologyMapper.getTopologyForHost(host)
-  }
+  private def getTopologyInfoForHost(h: String): String = 
topologyMapper.getTopologyForHost(h)
--- End diff --

I meant inline the function into the caller. It only seems to be used in 
one place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161999
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161937
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161980
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161947
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161892
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.{File, FileInputStream}
+import java.util.Properties
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   */
+  def getTopologyForHost(hostname: String): String
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
+  override def getTopologyForHost(hostname: String): String = {
+logDebug(s"Got a request for $hostname")
+"DefaultRack"
+  }
+}
+
+/**
+ * A simple file based topology mapper. This expects topology information 
provided as a
+ * [[java.util.Properties]] file. The name of the file is obtained from 
SparkConf property
+ * `spark.replication.topologyawareness.topologyfile`. To use this 
topology mapper, set the
--- End diff --

s/topologyfile/topologyFile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161782
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.{File, FileInputStream}
+import java.util.Properties
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   */
+  def getTopologyForHost(hostname: String): String
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
+  override def getTopologyForHost(hostname: String): String = {
+logDebug(s"Got a request for $hostname")
+"DefaultRack"
+  }
+}
+
+/**
+ * A simple file based topology mapper. This expects topology information 
provided as a
+ * [[java.util.Properties]] file. The name of the file is obtained from 
SparkConf property
+ * `spark.replication.topologyawareness.topologyfile`. To use this 
topology mapper, set the
+ * `spark.replication.topologyawareness.topologyMapper` property to
+ * [[org.apache.spark.storage.FileBasedTopologyMapper]]
+ * @param conf SparkConf object
+ */
+@DeveloperApi
+class FileBasedTopologyMapper(conf: SparkConf) extends 
TopologyMapper(conf) with Logging {
+
+  val topologyFile = 
conf.getOption("spark.replication.topologyawareness.topologyfile")
+  require(topologyFile.isDefined, "Please provide topology file for 
FileBasedTopologyMapper.")
--- End diff --

newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161774
  
--- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.{File, FileInputStream}
+import java.util.Properties
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/**
+ * ::DeveloperApi::
+ * TopologyMapper provides topology information for a given host
+ * @param conf SparkConf to get required properties, if needed
+ */
+@DeveloperApi
+abstract class TopologyMapper(conf: SparkConf) {
+  /**
+   * Gets the topology information given the host name
+   *
+   * @param hostname Hostname
+   * @return topology information for the given hostname. One can use a 
'topology delimiter'
+   * to make this topology information nested.
+   * For example : ‘/myrack/myhost’, where ‘/’ is the 
topology delimiter,
+   * ‘myrack’ is the topology identifier, and ‘myhost’ is 
the individual host.
+   * This function only returns the topology information without 
the hostname.
+   */
+  def getTopologyForHost(hostname: String): String
+}
+
+@DeveloperApi
+class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
+  override def getTopologyForHost(hostname: String): String = {
+logDebug(s"Got a request for $hostname")
+"DefaultRack"
+  }
+}
+
+/**
+ * A simple file based topology mapper. This expects topology information 
provided as a
+ * [[java.util.Properties]] file. The name of the file is obtained from 
SparkConf property
+ * `spark.replication.topologyawareness.topologyfile`. To use this 
topology mapper, set the
+ * `spark.replication.topologyawareness.topologyMapper` property to
+ * [[org.apache.spark.storage.FileBasedTopologyMapper]]
+ * @param conf SparkConf object
+ */
+@DeveloperApi
+class FileBasedTopologyMapper(conf: SparkConf) extends 
TopologyMapper(conf) with Logging {
+
+  val topologyFile = 
conf.getOption("spark.replication.topologyawareness.topologyfile")
+  require(topologyFile.isDefined, "Please provide topology file for 
FileBasedTopologyMapper.")
+  val topologyMap = Utils.getPropertiesFromFile(topologyFile.get)
+
+  override def getTopologyForHost(hostname: String): String = {
+val topology = topologyMap.get(hostname)
+if(topology.isDefined) {
--- End diff --

space after the if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161515
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks
--- End diff --

add: BlockManager will replicate to each peer returned in order until the 
desired replication order is reached. If a replication fails, prioritize() will 
be called again to get a fresh prioritization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-09 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r74161348
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -126,6 +138,10 @@ class BlockManagerMasterEndpoint(
   }
   }
 
+  private def getTopologyInfoForHost(host: String): String = {
--- End diff --

inline?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-05 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73751665
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615717
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks
+ */
+@DeveloperApi
+trait BlockReplicationPrioritization {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  def prioritize(blockManagerId: BlockManagerId,
+peers: Seq[BlockManagerId],
+peersReplicatedTo: Set[BlockManagerId],
+blockId: BlockId): Seq[BlockManagerId]
+}
+
+@DeveloperApi
+class DefaultBlockReplicationPrioritization
+  extends BlockReplicationPrioritization
+  with Logging {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block. This is a 
basic implementation,
+   * that just makes sure we put blocks on different hosts, if possible
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  override def prioritize(blockManagerId: BlockManagerId,
--- End diff --

style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615714
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.util.Random
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
+
+/**
+ * ::DeveloperApi::
+ * BlockReplicationPrioritization provides logic for prioritizing a 
sequence of peers for
+ * replicating blocks
+ */
+@DeveloperApi
+trait BlockReplicationPrioritization {
+
+  /**
+   * Method to prioritize a bunch of candidate peers of a block
+   *
+   * @param blockManagerId Id of the current BlockManager for self 
identification
+   * @param peers A list of peers of a BlockManager
+   * @param peersReplicatedTo Set of peers already replicated to
+   * @param blockId BlockId of the block being replicated. This can be 
used as a source of
+   *randomness if needed.
+   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
+   */
+  def prioritize(blockManagerId: BlockManagerId,
--- End diff --

style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615629
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala ---
@@ -111,4 +111,6 @@ private[spark] object BlockManagerMessages {
   case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster
 
   case class HasCachedBlocks(executorId: String) extends 
ToBlockManagerMaster
+
+  case class GetTopologyInfo(host: String) extends ToBlockManagerMaster
--- End diff --

Is this message still used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615313
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
---
@@ -103,8 +121,11 @@ private[spark] object BlockManagerId {
* @param port Port of the block manager.
* @return A new [[org.apache.spark.storage.BlockManagerId]].
*/
-  def apply(execId: String, host: String, port: Int): BlockManagerId =
-getCachedBlockManagerId(new BlockManagerId(execId, host, port))
+  def apply(execId: String,
--- End diff --

The style here is inconsistent, can you move execId to the next line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615368
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
@@ -50,12 +50,20 @@ class BlockManagerMaster(
 logInfo("Removal of executor " + execId + " requested")
   }
 
-  /** Register the BlockManager's id with the driver. */
+  /** Register the BlockManager's id with the driver. The input 
BlockManagerId does not contain
+   * topology information. This information is obtained from the master 
and we respond with an
+   * updated BlockManagerId fleshed out with this information.
+   *
+   */
   def registerBlockManager(
-  blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: 
RpcEndpointRef): Unit = {
+blockManagerId: BlockManagerId,
--- End diff --

There is also a style issue here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615208
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
@@ -50,12 +50,20 @@ class BlockManagerMaster(
 logInfo("Removal of executor " + execId + " requested")
   }
 
-  /** Register the BlockManager's id with the driver. */
+  /** Register the BlockManager's id with the driver. The input 
BlockManagerId does not contain
--- End diff --

use javadoc style here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615218
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
@@ -50,12 +50,20 @@ class BlockManagerMaster(
 logInfo("Removal of executor " + execId + " requested")
   }
 
-  /** Register the BlockManager's id with the driver. */
+  /** Register the BlockManager's id with the driver. The input 
BlockManagerId does not contain
+   * topology information. This information is obtained from the master 
and we respond with an
+   * updated BlockManagerId fleshed out with this information.
+   *
--- End diff --

extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73615142
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
---
@@ -69,24 +72,39 @@ class BlockManagerId private (
 out.writeUTF(executorId_)
 out.writeUTF(host_)
 out.writeInt(port_)
+out.writeBoolean(topologyInfo_.isDefined)
+// if we don't keep topology information, we just write an empty 
string.
+out.writeUTF(topologyInfo_.getOrElse(""))
--- End diff --

You can just omit this write if it's missing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73614814
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node. Not that this is a blocking call 
that returns after
+   * Replicate block to another node. Note that this is a blocking call 
that returns after
* the block has been replicated.
+   *
+   * @param blockId
+   * @param data
+   * @param level
+   * @param classTag
*/
   private def replicate(
-  blockId: BlockId,
-  data: ChunkedByteBuffer,
-  level: StorageLevel,
-  classTag: ClassTag[_]): Unit = {
+blockId: BlockId,
+data: ChunkedByteBuffer,
+level: StorageLevel,
+classTag: ClassTag[_]): Unit = {
+
 val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
-val numPeersToReplicateTo = level.replication - 1
-val peersForReplication = new ArrayBuffer[BlockManagerId]
-val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
-val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 val tLevel = StorageLevel(
   useDisk = level.useDisk,
   useMemory = level.useMemory,
   useOffHeap = level.useOffHeap,
   deserialized = level.deserialized,
   replication = 1)
+
+val numPeersToReplicateTo = level.replication - 1
+
 val startTime = System.currentTimeMillis
-val random = new Random(blockId.hashCode)
-
-var replicationFailed = false
-var failures = 0
-var done = false
-
-// Get cached list of peers
-peersForReplication ++= getPeers(forceFetch = false)
-
-// Get a random peer. Note that this selection of a peer is 
deterministic on the block id.
-// So assuming the list of peers does not change and no replication 
failures,
-// if there are multiple attempts in the same node to replicate the 
same block,
-// the same set of peers will be selected.
-def getRandomPeer(): Option[BlockManagerId] = {
-  // If replication had failed, then force update the cached list of 
peers and remove the peers
-  // that have been already used
-  if (replicationFailed) {
-peersForReplication.clear()
-peersForReplication ++= getPeers(forceFetch = true)
-peersForReplication --= peersReplicatedTo
-peersForReplication --= peersFailedToReplicateTo
-  }
-  if (!peersForReplication.isEmpty) {
-Some(peersForReplication(random.nextInt(peersForReplication.size)))
-  } else {
-None
-  }
-}
 
-// One by one choose a random peer and try uploading the block to it
-// If replication fails (e.g., target peer is down), force the list of 
cached peers
-// to be re-fetched from driver and then pick another random peer for 
replication. Also
-// temporarily black list the peer for which replication failed.
-//
-// This selection of a peer and replication is continued in a loop 
until one of the
-// following 3 conditions is fulfilled:
-// (i) specified number of peers have been replicated to
-// (ii) too many failures in replicating to peers
-// (iii) no peer left to replicate to
-//
-while (!done) {
-  getRandomPeer() match {
-case Some(peer) =>
-  try {
-val onePeerStartTime = System.currentTimeMillis
-logTrace(s"Trying to replicate $blockId of ${data.size} bytes 
to $peer")
-blockTransferService.uploadBlockSync(
-  peer.host,
-  peer.port,
-  peer.executorId,
-  blockId,
-  new NettyManagedBuffer(data.toNetty),
-  tLevel,
-  classTag)
-logTrace(s"Replicated $blockId of ${data.size} bytes to $peer 
in %s ms"
-  .format(System.currentTimeMillis - onePeerStartTime))
-peersReplicatedTo += peer
-peersForReplication -= peer
-replicationFailed = false
-if (peersReplicatedTo.size == numPeersToReplicateTo) {
-  done = true  // specified number of peers have been 
replicated to
-}
-  } catch {
-case e: Exception =>
-  logWarning(s"Failed to replicate $blockId to $peer, failure 
#$failures", e)
-  failures += 1
-  replicationFailed = true
-  peersFailedToReplicateTo += peer
-  if (failures > maxReplicationFailures) { 

[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-08-04 Thread shubhamchopra
Github user shubhamchopra commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r73593762
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -159,8 +162,27 @@ private[spark] class BlockManager(
 blockTransferService.init(this)
 shuffleClient.init(appId)
 
+val topologyInfo = {
+  val topologyStr = 
master.getTopologyInfo(blockTransferService.hostName)
+  if (topologyStr == null || topologyStr.isEmpty) {
+None
+  } else {
+Some(topologyStr)
+  }
+}
+
+blockReplicationPrioritizer = {
+  val priorityClass = conf.get(
+"spark.replication.topologyawareness.prioritizer",
+"org.apache.spark.storage.DefaultBlockReplicationPrioritization")
+  val clazz = Utils.classForName(priorityClass)
+  val ret = 
clazz.newInstance.asInstanceOf[BlockReplicationPrioritization]
+  logInfo(s"Using $priorityClass for prioritizing peers")
+  ret
+}
+
 blockManagerId = BlockManagerId(
-  executorId, blockTransferService.hostName, blockTransferService.port)
+  executorId, blockTransferService.hostName, 
blockTransferService.port, topologyInfo)
--- End diff --

I implemented this in the commits below. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

2016-07-28 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/13152#discussion_r72685380
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -159,8 +162,27 @@ private[spark] class BlockManager(
 blockTransferService.init(this)
 shuffleClient.init(appId)
 
+val topologyInfo = {
+  val topologyStr = 
master.getTopologyInfo(blockTransferService.hostName)
+  if (topologyStr == null || topologyStr.isEmpty) {
+None
+  } else {
+Some(topologyStr)
+  }
+}
+
+blockReplicationPrioritizer = {
+  val priorityClass = conf.get(
+"spark.replication.topologyawareness.prioritizer",
+"org.apache.spark.storage.DefaultBlockReplicationPrioritization")
+  val clazz = Utils.classForName(priorityClass)
+  val ret = 
clazz.newInstance.asInstanceOf[BlockReplicationPrioritization]
+  logInfo(s"Using $priorityClass for prioritizing peers")
+  ret
+}
+
 blockManagerId = BlockManagerId(
-  executorId, blockTransferService.hostName, blockTransferService.port)
+  executorId, blockTransferService.hostName, 
blockTransferService.port, topologyInfo)
--- End diff --

Would it work if topologyInfo was sent back from the master when 
`registerBlockManager` is called? It doesn't seem that anything uses 
`blockManagerId` until registration finishes. That way we wouldn't need this 
weird two-step registration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org