Repository: spark Updated Branches: refs/heads/master 4883a5087 -> 935f46630
[SPARK-12392][CORE] Optimize a location order of broadcast blocks by considering preferred local hosts When multiple workers exist in a host, we can bypass unnecessary remote access for broadcasts; block managers fetch broadcast blocks from the same host instead of remote hosts. Author: Takeshi YAMAMURO <linguin....@gmail.com> Closes #10346 from maropu/OptimizeBlockLocationOrder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/935f4663 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/935f4663 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/935f4663 Branch: refs/heads/master Commit: 935f46630685306edbdec91f71710703317fe129 Parents: 4883a50 Author: Takeshi YAMAMURO <linguin....@gmail.com> Authored: Mon Dec 21 14:02:40 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Mon Dec 21 14:03:23 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/storage/BlockManager.scala | 12 +++++++++++- .../apache/spark/storage/BlockManagerSuite.scala | 19 ++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/935f4663/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6074fc5..b5b7804 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -578,9 +578,19 @@ private[spark] class BlockManager( doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } + /** + * Return a list of locations for the given block, prioritizing the local machine since + * multiple block managers can share the same host. + */ + private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { + val locs = Random.shuffle(master.getLocations(blockId)) + val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } + preferredLocs ++ otherLocs + } + private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") - val locations = Random.shuffle(master.getLocations(blockId)) + val locations = getLocations(blockId) var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") http://git-wip-us.apache.org/repos/asf/spark/blob/935f4663/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 53991d8..bf49be3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -26,6 +26,7 @@ import scala.language.implicitConversions import scala.language.postfixOps import org.mockito.Mockito.{mock, when} +import org.mockito.{Matchers => mc} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ @@ -66,7 +67,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private def makeBlockManager( maxMem: Long, - name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + name: String = SparkContext.DRIVER_IDENTIFIER, + master: BlockManagerMaster = this.master): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, @@ -451,6 +453,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } + test("optimize a location order of blocks") { + val localHost = Utils.localHostName() + val otherHost = "otherHost" + val bmMaster = mock(classOf[BlockManagerMaster]) + val bmId1 = BlockManagerId("id1", localHost, 1) + val bmId2 = BlockManagerId("id2", localHost, 2) + val bmId3 = BlockManagerId("id3", otherHost, 3) + when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3)) + + val blockManager = makeBlockManager(128, "exec", bmMaster) + val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) + val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) + assert(locations.map(_.host) === Seq(localHost, localHost, otherHost)) + } + test("SPARK-9591: getRemoteBytes from another location when Exception throw") { val origTimeoutOpt = conf.getOption("spark.network.timeout") try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org