This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new cccba94 [LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout cccba94 is described below commit cccba9480e2db821d6cc67f580eeb67f2fac4e95 Author: runzhiwang <runzhiw...@tencent.com> AuthorDate: Mon Nov 25 11:44:37 2019 +0800 [LIVY-714][SERVER] Fix cannot remove the app in leakedAppTags when timeout ## What changes were proposed in this pull request? 1.`var isRemoved = false` should be in `while(iter.hasNext),` otherwise if there are two apps, the first app will be killApplication and the second app will timeout in this loop, and after removing the first app,` isRemoved = true`, and the second app cannot pass the` if(!isRemoved)` and only will be deleted in the next loop. 2.`entry.getValue - now` is negative, and never greater than `sessionLeakageCheckTimeout`. ![image](https://user-images.githubusercontent.com/51938049/69202431-99a81080-0b7c-11ea-8084-9801af5a75bd.png) ## How was this patch tested? Existed IT and UT. Author: runzhiwang <runzhiw...@tencent.com> Closes #259 from runzhiwang/leakapp. --- .../scala/org/apache/livy/utils/SparkYarnApp.scala | 27 ++++++++++++++++------ .../org/apache/livy/utils/SparkYarnAppSpec.scala | 21 +++++++++++++++++ 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala index 14af9fa..a245823 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala @@ -37,7 +37,8 @@ import org.apache.livy.{LivyConf, Logging, Utils} object SparkYarnApp extends Logging { - def init(livyConf: LivyConf): Unit = { + def init(livyConf: LivyConf, client: Option[YarnClient] = None): Unit = { + mockYarnClient = client sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL) sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT) leakedAppsGCThread.setDaemon(true) @@ -45,6 +46,8 @@ object SparkYarnApp extends Logging { leakedAppsGCThread.start() } + private var mockYarnClient: Option[YarnClient] = None + // YarnClient is thread safe. Create once, share it across threads. lazy val yarnClient = { val c = YarnClient.createYarnClient() @@ -59,9 +62,9 @@ object SparkYarnApp extends Logging { private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration = livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds - private val appType = Set("SPARK").asJava + private[utils] val appType = Set("SPARK").asJava - private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() private var sessionLeakageCheckTimeout: Long = _ @@ -69,24 +72,34 @@ object SparkYarnApp extends Logging { private val leakedAppsGCThread = new Thread() { override def run(): Unit = { + val client = { + mockYarnClient match { + case Some(client) => client + case None => yarnClient + } + } + while (true) { if (!leakedAppTags.isEmpty) { // kill the app if found it and remove it if exceeding a threshold val iter = leakedAppTags.entrySet().iterator() - var isRemoved = false val now = System.currentTimeMillis() - val apps = yarnClient.getApplications(appType).asScala + val apps = client.getApplications(appType).asScala + while(iter.hasNext) { + var isRemoved = false val entry = iter.next() + apps.find(_.getApplicationTags.contains(entry.getKey)) .foreach({ e => info(s"Kill leaked app ${e.getApplicationId}") - yarnClient.killApplication(e.getApplicationId) + client.killApplication(e.getApplicationId) iter.remove() isRemoved = true }) + if (!isRemoved) { - if ((entry.getValue - now) > sessionLeakageCheckTimeout) { + if ((now - entry.getValue) > sessionLeakageCheckTimeout) { iter.remove() info(s"Remove leaked yarn app tag ${entry.getKey}") } diff --git a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala index 064bb77..d43125d 100644 --- a/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala +++ b/server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala @@ -16,6 +16,7 @@ */ package org.apache.livy.utils +import java.util.ArrayList import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} @@ -461,5 +462,25 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite { } } } + + it("should delete leak app when timeout") { + Clock.withSleepMethod(mockSleep) { + livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL, "100ms") + livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms") + + val client = mock[YarnClient] + when(client.getApplications(SparkYarnApp.appType)). + thenReturn(new ArrayList[ApplicationReport]()) + + SparkYarnApp.init(livyConf, Some(client)) + + SparkYarnApp.leakedAppTags.clear() + SparkYarnApp.leakedAppTags.put("leakApp", System.currentTimeMillis()) + + Eventually.eventually(Eventually.timeout(TEST_TIMEOUT), Eventually.interval(100 millis)) { + assert(SparkYarnApp.leakedAppTags.size() == 0) + } + } + } } }