HeartSaVioR commented on a change in pull request #23790: [SPARK-26792][CORE]
Apply custom log URL to Spark UI
URL: https://github.com/apache/spark/pull/23790#discussion_r261427828
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
##########
@@ -120,6 +124,63 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
}
}
+ // Here we just have test for one happy case instead of all cases: other
cases are covered in
+ // FsHistoryProviderSuite.
+ test("custom log url for Spark UI is applied") {
+ val customExecutorLogUrl =
"http://newhost:9999/logs/clusters/{{CLUSTER_ID}}/users/{{USER}}" +
+ "/containers/{{CONTAINER_ID}}/{{FILE_NAME}}"
+
+ val conf = new SparkConf()
+ .set(CPUS_PER_TASK, 2)
+ .set(UI.CUSTOM_EXECUTOR_LOG_URL, customExecutorLogUrl)
+ .setMaster("local-cluster[0, 3, 1024]")
+ .setAppName("test")
+
+ sc = new SparkContext(conf)
+ val backend =
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+ val mockEndpointRef = mock[RpcEndpointRef]
+ val mockAddress = mock[RpcAddress]
+
+ val logUrls = Map(
+ "stdout" -> "http://oldhost:8888/logs/dummy/stdout",
+ "stderr" -> "http://oldhost:8888/logs/dummy/stderr")
+ val attributes = Map(
+ "CLUSTER_ID" -> "cl1",
+ "USER" -> "dummy",
+ "CONTAINER_ID" -> "container1",
+ "LOG_FILES" -> "stdout,stderr")
+
+ var executorAddedCount: Int = 0
+ val listener = new SparkListener() {
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded):
Unit = {
+ executorAddedCount += 1
+ assert(executorAdded.executorInfo.logUrlMap === Seq("stdout",
"stderr").map { file =>
+ file -> getExpectedCustomExecutorLogUrl(attributes, Some(file))
+ }.toMap)
+ }
+ }
+
+ try {
+ sc.addSparkListener(listener)
+
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls,
attributes))
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls,
attributes))
+ backend.driverEndpoint.askSync[Boolean](
+ RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls,
attributes))
+
+ eventually(timeout(executorUpTimeout)) {
+ // Ensure all executors have been launched.
+ assert(sc.getExecutorIds().length == 3)
+ }
+
+ assert(executorAddedCount === 3)
+ } finally {
+ sc.removeSparkListener(listener)
Review comment:
I would say it as "defensive programming", as we already have it in above
test. I would feel safer to follow the template what the code already has -
which should be reviewed before, and not modified it because it doesn't hurt.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]