attilapiros commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r524292900
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -657,6 +688,43 @@ class BlockManagerMasterEndpoint(
}
}
+ private def getShufflePushMergerLocations(
+ numMergersNeeded: Int,
+ hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+ val blockManagersWithExecutors =
blockManagerIdByExecutor.groupBy(_._2.host)
+ .mapValues(_.head).values.map(_._2).toSet
+ val filteredBlockManagersWithExecutors = blockManagersWithExecutors
+ .filterNot(x => hostsToFilter.contains(x.host))
+ val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map(
+ x => BlockManagerId(x.executorId, x.host,
StorageUtils.externalShuffleServicePort(conf)))
Review comment:
Will the `executorId` part of shuffle push merger location ever be used?
(I do not think so, but if yes then be aware `mapValues(_.head)` on a
HashMap won't produce a deterministic result.)
If not then the above lines can be simplified a lot, for example by
introducing a constant ID for shuffle push merger locations. This might help in
the debugging latter, too.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]