runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644854
 
 

 ##########
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##########
 @@ -56,20 +69,55 @@ object SparkYarnApp extends Logging {
     c
   }
 
-  private def getYarnTagToAppIdTimeout(livyConf: LivyConf): FiniteDuration =
-    livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT) milliseconds
-
-  private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
-    livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
-
   private[utils] val appType = Set("SPARK").asJava
 
   private[utils] val leakedAppTags = new 
java.util.concurrent.ConcurrentHashMap[String, Long]()
 
+  private val monitorAppThreadMap = new 
java.util.concurrent.ConcurrentHashMap[Thread, Long]()
+
+  private val appQueue = new ConcurrentLinkedQueue[SparkYarnApp]()
+
   private var sessionLeakageCheckTimeout: Long = _
 
   private var sessionLeakageCheckInterval: Long = _
 
+  private var yarnAppMonitorThreadInterval: Long = _
+
+  private var yarnAppMonitorThreadBlockCheckInterval: Long = _
+
+  private var yarnAppMonitorThreadBlockTimeout: Long = _
+
+  private var yarnAppMonitorMaxFailedTimes: Long = _
+
+  private var yarnTagToAppIdMaxFailedTimes: Long = _
+
+  private val checkMonitorAppTimeoutThread = new Thread() {
+    override def run(): Unit = {
+      while (true) {
+        try {
+          val iter = monitorAppThreadMap.entrySet().iterator()
+          val now = System.currentTimeMillis()
+
+          while (iter.hasNext) {
+            val entry = iter.next()
+            val thread = entry.getKey
+            val updatedTime = entry.getValue
+
+            if (now - updatedTime - yarnAppMonitorThreadInterval >
+              yarnAppMonitorThreadBlockTimeout) {
+              thread.interrupt()
+            }
+          }
+
+          Thread.sleep(yarnAppMonitorThreadBlockCheckInterval)
+        } catch {
+          case e: InterruptedException =>
+            error(s"checkMonitorAppTimeoutThread Exception whiling monitor", e)
 
 Review comment:
   @jerryshao updated

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to