vanzin commented on a change in pull request #23260: [SPARK-26311][CORE] New
feature: apply custom log URL pattern for executor log URLs in SHS
URL: https://github.com/apache/spark/pull/23260#discussion_r250014080
##########
File path:
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
##########
@@ -291,6 +292,159 @@ class FsHistoryProviderSuite extends SparkFunSuite with
BeforeAndAfter with Matc
}
}
+ test("Handling executor log url without renewing") {
+ val conf = createTestConf()
+ val appId = "app1"
+ val user = "user1"
+
+ val executorInfos = (1 to 5).map(createTestExecutorInfo(appId, user, _))
+
+ val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map {
execInfo =>
+ execInfo -> execInfo.logUrlMap
+ }.toMap
+
+ testHandlingExecutorLogUrl(conf, expected)
+ }
+
+ test("Handling executor log url with custom executor url - happy case") {
+ val conf = createTestConf()
+ .set(CUSTOM_EXECUTOR_LOG_URL,
"http://newhost:9999/logs/clusters/{{CLUSTER_ID}}" +
+ "/users/{{USER}}/containers/{{CONTAINER_ID}}/{{FILE_NAME}}")
+
+ // some of available attributes are not used in pattern which should be OK
+
+ val appId = "app1"
+ val user = "user1"
+
+ val executorInfos = (1 to 5).map(createTestExecutorInfo(appId, user, _))
+
+ val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map {
execInfo =>
+ val attr = execInfo.attributes
+ val newLogUrlMap = attr("LOG_FILES").split(",").map { file =>
+ val newLogUrl =
s"http://newhost:9999/logs/clusters/${attr("CLUSTER_ID")}" +
+ s"/users/${attr("USER")}/containers/${attr("CONTAINER_ID")}/$file"
+ file -> newLogUrl
+ }.toMap
+
+ execInfo -> newLogUrlMap
+ }.toMap
+
+ testHandlingExecutorLogUrl(conf, expected)
+ }
+
+ test("Handling executor log url with custom executor url - happy case - " +
+ "pattern doesn't contain 'FILE_NAME'") {
+ val conf = createTestConf()
+ .set(CUSTOM_EXECUTOR_LOG_URL,
"http://newhost:9999/logs/clusters/{{CLUSTER_ID}}" +
+ "/users/{{USER}}/containers/{{CONTAINER_ID}}")
+
+ // some of available attributes are not used in pattern which should be OK
+
+ val appId = "app1"
+ val user = "user1"
+
+ val executorInfos = (1 to 5).map(createTestExecutorInfo(appId, user, _))
+
+ val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map {
execInfo =>
+ val attr = execInfo.attributes
+ val newLogUrl =
s"http://newhost:9999/logs/clusters/${attr("CLUSTER_ID")}" +
+ s"/users/${attr("USER")}/containers/${attr("CONTAINER_ID")}"
+
+ execInfo -> Map("log" -> newLogUrl)
+ }.toMap
+
+ testHandlingExecutorLogUrl(conf, expected)
+ }
+
+ test("Handling executor log url with custom executor url - bad case - " +
+ "referring non-available attribute") {
+ // Here we are referring {{NON_EXISTING}} which is not available in
attributes,
+ // which Spark will fail back to provide origin log url with warning log.
+
+ val conf = createTestConf()
+ .set(CUSTOM_EXECUTOR_LOG_URL,
"http://newhost:9999/logs/clusters/{{CLUSTER_ID}}" +
+
"/users/{{USER}}/containers/{{CONTAINER_ID}}/{{NON_EXISTING}}/{{FILE_NAME}}")
+
+ val appId = "app1"
+ val user = "user1"
+
+ val executorInfos = (1 to 5).map(createTestExecutorInfo(appId, user, _))
+
+ val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map {
execInfo =>
+
+ execInfo -> execInfo.logUrlMap
+ }.toMap
+
+ testHandlingExecutorLogUrl(conf, expected)
+ }
+
+ test("Handling executor log url with custom executor url - bad case - " +
+ "'FILE_NAME' is given for pattern but 'LOG_FILES' attribute is not
available") {
+ // For this case Spark will fail back to provide origin log url with
warning log.
+
+ val conf = createTestConf()
+ .set(CUSTOM_EXECUTOR_LOG_URL,
"http://newhost:9999/logs/clusters/{{CLUSTER_ID}}" +
+
"/users/{{USER}}/containers/{{CONTAINER_ID}}/{{NON_EXISTING}}/{{FILE_NAME}}")
+
+ val appId = "app1"
+ val user = "user1"
+
+ val executorInfos = (1 to 5).map(
+ createTestExecutorInfo(appId, user, _, includingLogFiles = false))
+
+ val expected: Map[ExecutorInfo, Map[String, String]] = executorInfos.map {
execInfo =>
+ execInfo -> execInfo.logUrlMap
+ }.toMap
+
+ testHandlingExecutorLogUrl(conf, expected)
+ }
+
+ private def testHandlingExecutorLogUrl(
+ conf: SparkConf,
+ expectedLogUrlMap: Map[ExecutorInfo, Map[String, String]]): Unit = {
+ val provider = new FsHistoryProvider(conf)
+
+ val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
+
+ val executorAddedEvents = expectedLogUrlMap.keys.zipWithIndex.map { case
(execInfo, idx) =>
+ val event = SparkListenerExecutorAdded(1 + idx, s"exec$idx", execInfo)
+ event
+ }.toList.sortBy(_.time)
+ val allEvents = List(SparkListenerApplicationStart("app1", Some("app1"),
1L,
+ "test", Some("attempt1"))) ++ executorAddedEvents
+
+ writeFile(attempt1, true, None, allEvents: _*)
+
+ updateAndCheck(provider) { list =>
+ list.size should be (1)
+ list.head.attempts.size should be (1)
+
+ list.foreach { app =>
+ app.attempts.foreach { attempt =>
+ val appUi = provider.getAppUI(app.id, attempt.attemptId)
+ appUi should not be null
+ val executors = appUi.get.ui.store.executorList(false).iterator
+ executors should not be null
+
+ val iterForExpectation = expectedLogUrlMap.iterator
+
+ var executorCount = 0
+ while (executors.hasNext) {
+ val executor = executors.next()
+ val expectation = iterForExpectation.next()
Review comment:
`val (something, something) = iter.next()`
`_1` and `_2` are not good names, so avoid using them.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]