runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644854
########## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ########## @@ -56,20 +69,55 @@ object SparkYarnApp extends Logging { c } - private def getYarnTagToAppIdTimeout(livyConf: LivyConf): FiniteDuration = - livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT) milliseconds - - private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration = - livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds - private[utils] val appType = Set("SPARK").asJava private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + private val monitorAppThreadMap = new java.util.concurrent.ConcurrentHashMap[Thread, Long]() + + private val appQueue = new ConcurrentLinkedQueue[SparkYarnApp]() + private var sessionLeakageCheckTimeout: Long = _ private var sessionLeakageCheckInterval: Long = _ + private var yarnAppMonitorThreadInterval: Long = _ + + private var yarnAppMonitorThreadBlockCheckInterval: Long = _ + + private var yarnAppMonitorThreadBlockTimeout: Long = _ + + private var yarnAppMonitorMaxFailedTimes: Long = _ + + private var yarnTagToAppIdMaxFailedTimes: Long = _ + + private val checkMonitorAppTimeoutThread = new Thread() { + override def run(): Unit = { + while (true) { + try { + val iter = monitorAppThreadMap.entrySet().iterator() + val now = System.currentTimeMillis() + + while (iter.hasNext) { + val entry = iter.next() + val thread = entry.getKey + val updatedTime = entry.getValue + + if (now - updatedTime - yarnAppMonitorThreadInterval > + yarnAppMonitorThreadBlockTimeout) { + thread.interrupt() + } + } + + Thread.sleep(yarnAppMonitorThreadBlockCheckInterval) + } catch { + case e: InterruptedException => + error(s"checkMonitorAppTimeoutThread Exception whiling monitor", e) Review comment: @jerryshao updated ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services