[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354796867 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] ( } } + yarnTagToAppIdFailedTimes = 0 + debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}") } catch { - case _: InterruptedException => -yarnDiagnostics = ArrayBuffer("Session stopped by user.") -changeState(SparkApp.State.KILLED) + // throw IllegalStateException when getAppId failed Review comment: updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354797021 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] ( } } + yarnTagToAppIdFailedTimes = 0 + debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}") } catch { - case _: InterruptedException => -yarnDiagnostics = ArrayBuffer("Session stopped by user.") -changeState(SparkApp.State.KILLED) + // throw IllegalStateException when getAppId failed Review comment: I have revert it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354796867 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] ( } } + yarnTagToAppIdFailedTimes = 0 + debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}") } catch { - case _: InterruptedException => -yarnDiagnostics = ArrayBuffer("Session stopped by user.") -changeState(SparkApp.State.KILLED) + // throw IllegalStateException when getAppId failed Review comment: updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354796727 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -254,50 +268,45 @@ class SparkYarnApp private[utils] ( } } - // Exposed for unit test. - // TODO Instead of spawning a thread for every session, create a centralized thread and - // batch YARN queries. - private[utils] val yarnAppMonitorThread = Utils.startDaemonThread(s"yarnAppMonitorThread-$this") { + private var yarnTagToAppIdFailedTimes: Int = _ + + private def failToMonitor(): Unit = { +changeState(SparkApp.State.FAILED) +process.foreach(_.destroy()) +leakedAppTags.put(appTag, System.currentTimeMillis()) + } + + private def monitorSparkYarnApp(): Unit = { try { + if (killed) { +changeState(SparkApp.State.KILLED) + } else if (isProcessErrExit()) { +changeState(SparkApp.State.FAILED) + } // If appId is not known, query YARN by appTag to get it. - val appId = try { -appIdOption.map(ConverterUtils.toApplicationId).getOrElse { - val pollInterval = getYarnPollInterval(livyConf) - val deadline = getYarnTagToAppIdTimeout(livyConf).fromNow - getAppIdFromTag(appTag, pollInterval, deadline) + if (appId.isEmpty) { +appId = getAppId() +if (appId.isEmpty) { + throw new IllegalStateException(s"No YARN application is found with tag " + Review comment: updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354783003 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -111,7 +152,48 @@ object SparkYarnApp extends Logging { } } + class YarnAppMonitorRunnable extends Runnable { +override def run(): Unit = { + var loop = true + while (loop) { +try { + // update time when monitor app so that + // checkMonitorAppTimeoutThread can check whether the thread was blocked on monitoring + monitorAppThreadMap.put(Thread.currentThread(), System.currentTimeMillis()) + + val app = appQueue.poll() + if (app != null) { +app.monitorSparkYarnApp() +if (app.isRunning) { + appQueue.add(app) +} + } + + Thread.sleep(yarnPoolInterval) +} catch { + case e: InterruptedException => +loop = false +error(s"YarnAppMonitorRunnable Exception whiling monitor", e) +} + } +} + } + + private def initYarnAppMonitorThreadPool(livyConf: LivyConf): Unit = { +val poolSize = livyConf.getInt(LivyConf.YARN_APP_LOOKUP_THREAD_POOL_SIZE) +val yarnAppMonitorThreadPool: ExecutorService = + Executors.newFixedThreadPool(poolSize) + +val runnable = new YarnAppMonitorRunnable() +for (i <- 0 until poolSize) { + yarnAppMonitorThreadPool.execute(runnable) +} Review comment: @jahstreet Hi,Because we start `poolSize` long running threads to monitor all the app, each execute start one thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354783003 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -111,7 +152,48 @@ object SparkYarnApp extends Logging { } } + class YarnAppMonitorRunnable extends Runnable { +override def run(): Unit = { + var loop = true + while (loop) { +try { + // update time when monitor app so that + // checkMonitorAppTimeoutThread can check whether the thread was blocked on monitoring + monitorAppThreadMap.put(Thread.currentThread(), System.currentTimeMillis()) + + val app = appQueue.poll() + if (app != null) { +app.monitorSparkYarnApp() +if (app.isRunning) { + appQueue.add(app) +} + } + + Thread.sleep(yarnPoolInterval) +} catch { + case e: InterruptedException => +loop = false +error(s"YarnAppMonitorRunnable Exception whiling monitor", e) +} + } +} + } + + private def initYarnAppMonitorThreadPool(livyConf: LivyConf): Unit = { +val poolSize = livyConf.getInt(LivyConf.YARN_APP_LOOKUP_THREAD_POOL_SIZE) +val yarnAppMonitorThreadPool: ExecutorService = + Executors.newFixedThreadPool(poolSize) + +val runnable = new YarnAppMonitorRunnable() +for (i <- 0 until poolSize) { + yarnAppMonitorThreadPool.execute(runnable) +} Review comment: @jahstreet Hi,Because we start `poolSize` long running threads to monitor all the app, each `execute` start one thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354652037 ## File path: server/src/main/scala/org/apache/livy/LivyConf.scala ## @@ -205,6 +205,13 @@ object LivyConf { // If Livy can't find the yarn app within this time, consider it lost. val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "120s") + // If Livy can't find the yarn app within this max times, consider it lost. + val YARN_APP_LOOKUP_MAX_FAILED_TIMES = Entry("livy.server.yarn.app-lookup.max-failed.times", 120) Review comment: @jerryshao updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354652037 ## File path: server/src/main/scala/org/apache/livy/LivyConf.scala ## @@ -205,6 +205,13 @@ object LivyConf { // If Livy can't find the yarn app within this time, consider it lost. val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "120s") + // If Livy can't find the yarn app within this max times, consider it lost. + val YARN_APP_LOOKUP_MAX_FAILED_TIMES = Entry("livy.server.yarn.app-lookup.max-failed.times", 120) Review comment: @jerryshao updated, 30 times is enough, because sleep 5 seconds each time, so the total time is bigger than 150 seconds. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644917 ## File path: server/src/main/scala/org/apache/livy/LivyConf.scala ## @@ -216,11 +213,27 @@ object LivyConf { // RSC related jars separated with comma. val RSC_JARS = Entry("livy.rsc.jars", null) - // How long to check livy session leakage + // How long to check livy session leakage. val YARN_APP_LEAKAGE_CHECK_TIMEOUT = Entry("livy.server.yarn.app-leakage.check-timeout", "600s") - // how often to check livy session leakage + // How often to check livy session leakage. val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") + // The size of thread pool to monitor all yarn apps. + val YARN_APP_MONITOR_THREAD_POOL_SIZE = +Entry("livy.server.yarn.app-monitor.thread-pool.size", 4) + // How often thread monitor the YARN app. + val YARN_APP_MONITOR_THREAD_INTERVAL = Entry("livy.server.yarn.app-monitor.thread-interval", "5s") + // How often to check monitor thread block + val YARN_APP_MONITOR_THREAD_BLOCK_CHECK_INTERVAL = +Entry("livy.server.yarn.app-monitor.thread-block.check-interval", "10s") + // If some thread cost more than this config to monitor one app, + // stop the monitor of the app which considered blocked. + val YARN_APP_MONITOR_THREAD_BLOCK_TIMEOUT = +Entry("livy.server.yarn.app-monitor.thread-block-timeout", "60s") + // If Livy can't monitor the yarn app successfully within this max times, consider the app failed. + val YARN_APP_MONITOR_MAX_FAILED_TIMES = +Entry("livy.server.yarn.app-monitor.max-failed.times", 12) Review comment: @jerryshao updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644888 ## File path: server/src/main/scala/org/apache/livy/LivyConf.scala ## @@ -202,11 +202,8 @@ object LivyConf { // Livy will cache the max no of logs specified. 0 means don't cache the logs. val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200) - // If Livy can't find the yarn app within this time, consider it lost. - val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "120s") - - // How often Livy polls YARN to refresh YARN app state. - val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s") Review comment: @jerryshao updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644854 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -56,20 +69,55 @@ object SparkYarnApp extends Logging { c } - private def getYarnTagToAppIdTimeout(livyConf: LivyConf): FiniteDuration = -livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT) milliseconds - - private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration = -livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds - private[utils] val appType = Set("SPARK").asJava private[utils] val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() + private val monitorAppThreadMap = new java.util.concurrent.ConcurrentHashMap[Thread, Long]() + + private val appQueue = new ConcurrentLinkedQueue[SparkYarnApp]() + private var sessionLeakageCheckTimeout: Long = _ private var sessionLeakageCheckInterval: Long = _ + private var yarnAppMonitorThreadInterval: Long = _ + + private var yarnAppMonitorThreadBlockCheckInterval: Long = _ + + private var yarnAppMonitorThreadBlockTimeout: Long = _ + + private var yarnAppMonitorMaxFailedTimes: Long = _ + + private var yarnTagToAppIdMaxFailedTimes: Long = _ + + private val checkMonitorAppTimeoutThread = new Thread() { +override def run(): Unit = { + while (true) { +try { + val iter = monitorAppThreadMap.entrySet().iterator() + val now = System.currentTimeMillis() + + while (iter.hasNext) { +val entry = iter.next() +val thread = entry.getKey +val updatedTime = entry.getValue + +if (now - updatedTime - yarnAppMonitorThreadInterval > + yarnAppMonitorThreadBlockTimeout) { + thread.interrupt() +} + } + + Thread.sleep(yarnAppMonitorThreadBlockCheckInterval) +} catch { + case e: InterruptedException => +error(s"checkMonitorAppTimeoutThread Exception whiling monitor", e) Review comment: @jerryshao updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r351624126 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging { } } + val yarnAppMonitorThreadPool: ExecutorService = +Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize) + + for (i <- 0 until yarnAppMonitorThreadPoolSize) { +yarnAppMonitorThreadPool.execute(new Runnable { + override def run(): Unit = { +var loop = true +try { + while (loop) { +val app = appQueue.poll() +if (app != null) { + app.monitorSparkYarnApp() Review comment: @jerryshao @yiheng I add `checkMonitorAppTimeoutThread ` to check whether `monitorSparkYarnApp ` was blocked. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] ( with Logging { import SparkYarnApp._ + appQueue.add(this) + private var killed = false - private val appIdPromise: Promise[ApplicationId] = Promise() private[utils] var state: SparkApp.State = SparkApp.State.STARTING private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + private var appInfo = AppInfo() Review comment: @yiheng it change at this line: https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R321 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r347187839 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -98,7 +116,35 @@ object SparkYarnApp extends Logging { } } - + private val yarnAppMonitorThread = new Thread() { +override def run() : Unit = { + val executor = Executors.newSingleThreadExecutor + while (true) { +for ((app, appTag) <- appMap) { + Future { Review comment: @jerryshao The mainly purpose of executor inside the future, is that I should control the run time of every thread in thread pool in case the rpc block. Once the rpc block happens, the thread can be canceled by `handler.cancel(true)`. Additionally, `val executor = Executors.newSingleThreadExecutor` should be inside of `Future`, and I have updated it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] ( with Logging { import SparkYarnApp._ + appQueue.add(this) + private var killed = false - private val appIdPromise: Promise[ApplicationId] = Promise() private[utils] var state: SparkApp.State = SparkApp.State.STARTING private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + private var appInfo = AppInfo() Review comment: @yiheng it change at this line: https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R310 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348449088 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging { } } + val yarnAppMonitorThreadPool: ExecutorService = +Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize) + + for (i <- 0 until yarnAppMonitorThreadPoolSize) { +yarnAppMonitorThreadPool.execute(new Runnable { Review comment: @jerryshao updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350700704 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -165,40 +231,24 @@ class SparkYarnApp private[utils] ( /** * Find the corresponding YARN application id from an application tag. * - * @param appTag The application tag tagged on the target application. - * If the tag is not unique, it returns the first application it found. - * It will be converted to lower case to match YARN's behaviour. * @return ApplicationId or the failure. */ - @tailrec - private def getAppIdFromTag( - appTag: String, - pollInterval: Duration, - deadline: Deadline): ApplicationId = { -if (isProcessErrExit()) { - throw new IllegalStateException("spark-submit start failed") -} - -val appTagLowerCase = appTag.toLowerCase() - -// FIXME Should not loop thru all YARN applications but YarnClient doesn't offer an API. -// Consider calling rmClient in YarnClient directly. - yarnClient.getApplications(appType).asScala.find(_.getApplicationTags.contains(appTagLowerCase)) -match { - case Some(app) => app.getApplicationId - case None => -if (deadline.isOverdue) { - process.foreach(_.destroy()) - leakedAppTags.put(appTag, System.currentTimeMillis()) + private def getAppId(): ApplicationId = { Review comment: @yiheng @jahstreet updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350701490 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -311,15 +322,22 @@ class SparkYarnApp private[utils] ( } } + yarnTagToAppIdFailedTimes = 0 + debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}") } catch { - case _: InterruptedException => -yarnDiagnostics = ArrayBuffer("Session stopped by user.") -changeState(SparkApp.State.KILLED) + // throw IllegalStateException when getAppId failed + case e: IllegalStateException => +yarnTagToAppIdFailedTimes += 1 +if (yarnTagToAppIdFailedTimes > getYarnTagToAppIdMaxFailedTimes(livyConf)) { Review comment: @yiheng updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348332474 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging { } } + val yarnAppMonitorThreadPool: ExecutorService = +Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize) + + for (i <- 0 until yarnAppMonitorThreadPoolSize) { +yarnAppMonitorThreadPool.execute(new Runnable { + override def run(): Unit = { +var loop = true +try { + while (loop) { +val app = appQueue.poll() +if (app != null) { + app.monitorSparkYarnApp() +} + +if (app.isRunning) { + appQueue.add(app) +} + +Thread.sleep(yarnPollInterval) + } +} catch { + case _: InterruptedException => loop = false Review comment: I have changed, Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services