LantaoJin commented on a change in pull request #23951:
[SPARK-27038][CORE][YARN] Re-implement RackResolver to reduce resolving time
URL: https://github.com/apache/spark/pull/23951#discussion_r263655647
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
##########
@@ -69,18 +69,48 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler:
FakeTaskScheduler)
// Get the rack for a given host
object FakeRackUtil {
private val hostToRack = new mutable.HashMap[String, String]()
+ var loopCount = 0
def cleanUp() {
hostToRack.clear()
+ loopCount = 0
}
def assignHostToRack(host: String, rack: String) {
hostToRack(host) = rack
}
def getRackForHost(host: String): Option[String] = {
+ loopCount = simulateRunResolveCommand(Seq(host))
hostToRack.get(host)
}
+
+ def getRacksForHosts(hosts: List[String]): List[Option[String]] = {
+ loopCount = simulateRunResolveCommand(hosts)
+ hosts.map(hostToRack.get)
+ }
+
+ /**
+ * This is a simulation of building and executing the resolution command.
+ * Simulate function `runResolveCommand()` in
[[org.apache.hadoop.net.ScriptBasedMapping]].
+ * If Seq has 100 elements, it returns 4. If Seq has 1 elements, it returns
1.
+ * @param args a list of arguments
+ * @return script execution times
+ */
+ private def simulateRunResolveCommand(args: Seq[String]): Int = {
+ val maxArgs = 30 // Simulate NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT
+ var numProcessed = 0
+ var loopCount = 0
+ while (numProcessed != args.size) {
+ var start = maxArgs * loopCount
+ numProcessed = start
+ while (numProcessed < (start + maxArgs) && numProcessed < args.size) {
+ numProcessed += 1
+ }
+ loopCount += 1
+ }
+ loopCount
Review comment:
I didn't improve performance via (2). It still uses the cache provided by
Hadoop itself. Why I added this complexity code for UT is that after applied
this patch, the initializing processing of `TaskSetManager` won't invoke
`getRackForHost()` any more. It has no place to count invocations for
getRackForHost() vs counts of getRacksForHosts(). So I added this simulator to
count the execution count of script. If this patch reversed, this
`assert(FakeRackUtil.loopCount === 4)` will fail.
For (3), `getRacksForHosts` will always be invoked whatever
`skipRackResolving` is true of false.
```
override def getRacksForHosts(hostPorts: List[String]):
List[Option[String]] = {
val hosts = hostPorts.map(Utils.parseHostPort(_)._1)
if (skipRackResolving) {
hosts.map(_ => Option(NetworkTopology.DEFAULT_RACK))
} else {
SparkRackResolver.resolveRacks(sc.hadoopConfiguration, hosts).map {
node =>
Option(node.getNetworkLocation)
}
}
}
```
Similarly, it could be verified it check the execution count of script.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]