vanzin commented on a change in pull request #23790: [SPARK-26792][CORE] Apply
custom log URL to Spark UI
URL: https://github.com/apache/spark/pull/23790#discussion_r261352559
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
##########
@@ -120,6 +124,63 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
}
}
+ // Here we just have test for one happy case instead of all cases: other
cases are covered in
+ // FsHistoryProviderSuite.
+ test("custom log url for Spark UI is applied") {
+ val customExecutorLogUrl =
"http://newhost:9999/logs/clusters/{{CLUSTER_ID}}/users/{{USER}}" +
+ "/containers/{{CONTAINER_ID}}/{{FILE_NAME}}"
+
+ val conf = new SparkConf()
+ .set(CPUS_PER_TASK, 2)
+ .set(UI.CUSTOM_EXECUTOR_LOG_URL, customExecutorLogUrl)
+ .setMaster("local-cluster[0, 3, 1024]")
+ .setAppName("test")
+
+ sc = new SparkContext(conf)
+ val backend =
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+ val mockEndpointRef = mock[RpcEndpointRef]
+ val mockAddress = mock[RpcAddress]
+
+ val logUrls = Map(
+ "stdout" -> "http://oldhost:8888/logs/dummy/stdout",
+ "stderr" -> "http://oldhost:8888/logs/dummy/stderr")
+ val attributes = Map(
+ "CLUSTER_ID" -> "cl1",
+ "USER" -> "dummy",
+ "CONTAINER_ID" -> "container1",
+ "LOG_FILES" -> "stdout,stderr")
+
+ var executorAddedCount: Int = 0
+ val listener = new SparkListener() {
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded):
Unit = {
+ executorAddedCount += 1
+ assert(executorAdded.executorInfo.logUrlMap === Seq("stdout",
"stderr").map { file =>
+ file -> getExpectedCustomExecutorLogUrl(attributes, Some(file))
+ }.toMap)
+ }
+ }
+
+ try {
+ sc.addSparkListener(listener)
+
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls,
attributes))
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls,
attributes))
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls,
attributes))
+
+ eventually(timeout(executorUpTimeout)) {
+ // Ensure all executors have been launched.
+ assert(sc.getExecutorIds().length == 3)
+ }
+
+ assert(executorAddedCount === 3)
Review comment:
You need to call `sc.listenerBus.waitUntilEmpty` or this will be flaky.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]