[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-06 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354796867
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] (
 }
   }
 
+  yarnTagToAppIdFailedTimes = 0
+
   debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
 } catch {
-  case _: InterruptedException =>
-yarnDiagnostics = ArrayBuffer("Session stopped by user.")
-changeState(SparkApp.State.KILLED)
+  // throw IllegalStateException when getAppId failed
 
 Review comment:
   updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-06 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354797021
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] (
 }
   }
 
+  yarnTagToAppIdFailedTimes = 0
+
   debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
 } catch {
-  case _: InterruptedException =>
-yarnDiagnostics = ArrayBuffer("Session stopped by user.")
-changeState(SparkApp.State.KILLED)
+  // throw IllegalStateException when getAppId failed
 
 Review comment:
   I have revert it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-06 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354796867
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] (
 }
   }
 
+  yarnTagToAppIdFailedTimes = 0
+
   debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
 } catch {
-  case _: InterruptedException =>
-yarnDiagnostics = ArrayBuffer("Session stopped by user.")
-changeState(SparkApp.State.KILLED)
+  // throw IllegalStateException when getAppId failed
 
 Review comment:
   updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-06 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354796727
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -254,50 +268,45 @@ class SparkYarnApp private[utils] (
 }
   }
 
-  // Exposed for unit test.
-  // TODO Instead of spawning a thread for every session, create a centralized 
thread and
-  // batch YARN queries.
-  private[utils] val yarnAppMonitorThread = 
Utils.startDaemonThread(s"yarnAppMonitorThread-$this") {
+  private var yarnTagToAppIdFailedTimes: Int = _
+
+  private def failToMonitor(): Unit = {
+changeState(SparkApp.State.FAILED)
+process.foreach(_.destroy())
+leakedAppTags.put(appTag, System.currentTimeMillis())
+  }
+
+  private def monitorSparkYarnApp(): Unit = {
 try {
+  if (killed) {
+changeState(SparkApp.State.KILLED)
+  } else if (isProcessErrExit()) {
+changeState(SparkApp.State.FAILED)
+  }
   // If appId is not known, query YARN by appTag to get it.
-  val appId = try {
-appIdOption.map(ConverterUtils.toApplicationId).getOrElse {
-  val pollInterval = getYarnPollInterval(livyConf)
-  val deadline = getYarnTagToAppIdTimeout(livyConf).fromNow
-  getAppIdFromTag(appTag, pollInterval, deadline)
+  if (appId.isEmpty) {
+appId = getAppId()
+if (appId.isEmpty) {
+  throw new IllegalStateException(s"No YARN application is found with 
tag " +
 
 Review comment:
   updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-06 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354783003
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -111,7 +152,48 @@ object SparkYarnApp extends Logging {
 }
   }
 
+  class YarnAppMonitorRunnable extends Runnable {
+override def run(): Unit = {
+  var loop = true
+  while (loop) {
+try {
+  // update time when monitor app so that
+  // checkMonitorAppTimeoutThread can check whether the thread was 
blocked on monitoring
+  monitorAppThreadMap.put(Thread.currentThread(), 
System.currentTimeMillis())
+
+  val app = appQueue.poll()
+  if (app != null) {
+app.monitorSparkYarnApp()
 
+if (app.isRunning) {
+  appQueue.add(app)
+}
+  }
+
+  Thread.sleep(yarnPoolInterval)
+} catch {
+  case e: InterruptedException =>
+loop = false
+error(s"YarnAppMonitorRunnable Exception whiling monitor", e)
+}
+  }
+}
+  }
+
+  private def initYarnAppMonitorThreadPool(livyConf: LivyConf): Unit = {
+val poolSize = livyConf.getInt(LivyConf.YARN_APP_LOOKUP_THREAD_POOL_SIZE)
+val yarnAppMonitorThreadPool: ExecutorService =
+  Executors.newFixedThreadPool(poolSize)
+
+val runnable = new YarnAppMonitorRunnable()
+for (i <- 0 until poolSize) {
+  yarnAppMonitorThreadPool.execute(runnable)
+}
 
 Review comment:
   @jahstreet Hi,Because we start `poolSize` long running threads to monitor 
all the app, each execute start one thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-06 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354783003
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -111,7 +152,48 @@ object SparkYarnApp extends Logging {
 }
   }
 
+  class YarnAppMonitorRunnable extends Runnable {
+override def run(): Unit = {
+  var loop = true
+  while (loop) {
+try {
+  // update time when monitor app so that
+  // checkMonitorAppTimeoutThread can check whether the thread was 
blocked on monitoring
+  monitorAppThreadMap.put(Thread.currentThread(), 
System.currentTimeMillis())
+
+  val app = appQueue.poll()
+  if (app != null) {
+app.monitorSparkYarnApp()
 
+if (app.isRunning) {
+  appQueue.add(app)
+}
+  }
+
+  Thread.sleep(yarnPoolInterval)
+} catch {
+  case e: InterruptedException =>
+loop = false
+error(s"YarnAppMonitorRunnable Exception whiling monitor", e)
+}
+  }
+}
+  }
+
+  private def initYarnAppMonitorThreadPool(livyConf: LivyConf): Unit = {
+val poolSize = livyConf.getInt(LivyConf.YARN_APP_LOOKUP_THREAD_POOL_SIZE)
+val yarnAppMonitorThreadPool: ExecutorService =
+  Executors.newFixedThreadPool(poolSize)
+
+val runnable = new YarnAppMonitorRunnable()
+for (i <- 0 until poolSize) {
+  yarnAppMonitorThreadPool.execute(runnable)
+}
 
 Review comment:
   @jahstreet Hi,Because we start `poolSize` long running threads to monitor 
all the app, each `execute` start one thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-05 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354652037
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/LivyConf.scala
 ##
 @@ -205,6 +205,13 @@ object LivyConf {
   // If Livy can't find the yarn app within this time, consider it lost.
   val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", 
"120s")
 
+  // If Livy can't find the yarn app within this max times, consider it lost.
+  val YARN_APP_LOOKUP_MAX_FAILED_TIMES = 
Entry("livy.server.yarn.app-lookup.max-failed.times", 120)
 
 Review comment:
   @jerryshao updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-05 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354652037
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/LivyConf.scala
 ##
 @@ -205,6 +205,13 @@ object LivyConf {
   // If Livy can't find the yarn app within this time, consider it lost.
   val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", 
"120s")
 
+  // If Livy can't find the yarn app within this max times, consider it lost.
+  val YARN_APP_LOOKUP_MAX_FAILED_TIMES = 
Entry("livy.server.yarn.app-lookup.max-failed.times", 120)
 
 Review comment:
   @jerryshao updated, 30 times is enough, because sleep 5 seconds each time, 
so the total time is bigger than 150 seconds.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-05 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644917
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/LivyConf.scala
 ##
 @@ -216,11 +213,27 @@ object LivyConf {
   // RSC related jars separated with comma.
   val RSC_JARS = Entry("livy.rsc.jars", null)
 
-  // How long to check livy session leakage
+  // How long to check livy session leakage.
   val YARN_APP_LEAKAGE_CHECK_TIMEOUT = 
Entry("livy.server.yarn.app-leakage.check-timeout", "600s")
-  // how often to check livy session leakage
+  // How often to check livy session leakage.
   val YARN_APP_LEAKAGE_CHECK_INTERVAL = 
Entry("livy.server.yarn.app-leakage.check-interval", "60s")
 
+  // The size of thread pool to monitor all yarn apps.
+  val YARN_APP_MONITOR_THREAD_POOL_SIZE =
+Entry("livy.server.yarn.app-monitor.thread-pool.size", 4)
+  // How often thread monitor the YARN app.
+  val YARN_APP_MONITOR_THREAD_INTERVAL = 
Entry("livy.server.yarn.app-monitor.thread-interval", "5s")
+  // How often to check monitor thread block
+  val YARN_APP_MONITOR_THREAD_BLOCK_CHECK_INTERVAL =
+Entry("livy.server.yarn.app-monitor.thread-block.check-interval", "10s")
+  // If some thread cost more than this config to monitor one app,
+  // stop the monitor of the app which considered blocked.
+  val YARN_APP_MONITOR_THREAD_BLOCK_TIMEOUT =
+Entry("livy.server.yarn.app-monitor.thread-block-timeout", "60s")
+  // If Livy can't monitor the yarn app successfully within this max times, 
consider the app failed.
+  val YARN_APP_MONITOR_MAX_FAILED_TIMES =
+Entry("livy.server.yarn.app-monitor.max-failed.times", 12)
 
 Review comment:
   @jerryshao updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-05 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644888
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/LivyConf.scala
 ##
 @@ -202,11 +202,8 @@ object LivyConf {
   // Livy will cache the max no of logs specified. 0 means don't cache the 
logs.
   val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)
 
-  // If Livy can't find the yarn app within this time, consider it lost.
-  val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", 
"120s")
-
-  // How often Livy polls YARN to refresh YARN app state.
-  val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s")
 
 Review comment:
   @jerryshao updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-12-05 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r354644854
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -56,20 +69,55 @@ object SparkYarnApp extends Logging {
 c
   }
 
-  private def getYarnTagToAppIdTimeout(livyConf: LivyConf): FiniteDuration =
-livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT) milliseconds
-
-  private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
-livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
-
   private[utils] val appType = Set("SPARK").asJava
 
   private[utils] val leakedAppTags = new 
java.util.concurrent.ConcurrentHashMap[String, Long]()
 
+  private val monitorAppThreadMap = new 
java.util.concurrent.ConcurrentHashMap[Thread, Long]()
+
+  private val appQueue = new ConcurrentLinkedQueue[SparkYarnApp]()
+
   private var sessionLeakageCheckTimeout: Long = _
 
   private var sessionLeakageCheckInterval: Long = _
 
+  private var yarnAppMonitorThreadInterval: Long = _
+
+  private var yarnAppMonitorThreadBlockCheckInterval: Long = _
+
+  private var yarnAppMonitorThreadBlockTimeout: Long = _
+
+  private var yarnAppMonitorMaxFailedTimes: Long = _
+
+  private var yarnTagToAppIdMaxFailedTimes: Long = _
+
+  private val checkMonitorAppTimeoutThread = new Thread() {
+override def run(): Unit = {
+  while (true) {
+try {
+  val iter = monitorAppThreadMap.entrySet().iterator()
+  val now = System.currentTimeMillis()
+
+  while (iter.hasNext) {
+val entry = iter.next()
+val thread = entry.getKey
+val updatedTime = entry.getValue
+
+if (now - updatedTime - yarnAppMonitorThreadInterval >
+  yarnAppMonitorThreadBlockTimeout) {
+  thread.interrupt()
+}
+  }
+
+  Thread.sleep(yarnAppMonitorThreadBlockCheckInterval)
+} catch {
+  case e: InterruptedException =>
+error(s"checkMonitorAppTimeoutThread Exception whiling monitor", e)
 
 Review comment:
   @jerryshao updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-27 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r351624126
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging {
 }
   }
 
+  val yarnAppMonitorThreadPool: ExecutorService =
+Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize)
+
+  for (i <- 0 until yarnAppMonitorThreadPoolSize) {
+yarnAppMonitorThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+var loop = true
+try {
+  while (loop) {
+val app = appQueue.poll()
+if (app != null) {
+  app.monitorSparkYarnApp()
 
 Review comment:
   @jerryshao @yiheng I add `checkMonitorAppTimeoutThread ` to check whether 
`monitorSparkYarnApp ` was blocked.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] (
   with Logging {
   import SparkYarnApp._
 
+  appQueue.add(this)
+
   private var killed = false
-  private val appIdPromise: Promise[ApplicationId] = Promise()
   private[utils] var state: SparkApp.State = SparkApp.State.STARTING
   private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
+  private var appInfo = AppInfo()
 
 Review comment:
   @yiheng  it change at this line: 
https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R321


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r347187839
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -98,7 +116,35 @@ object SparkYarnApp extends Logging {
 }
   }
 
-
+  private val yarnAppMonitorThread = new Thread() {
+override def run() : Unit = {
+  val executor = Executors.newSingleThreadExecutor
+  while (true) {
+for ((app, appTag) <- appMap) {
+  Future {
 
 Review comment:
   @jerryshao The mainly purpose of executor inside the future, is that I 
should control the run time of every thread in thread pool in case the rpc 
block. Once the rpc block happens, the thread can be canceled by  
`handler.cancel(true)`. Additionally,  `val executor = 
Executors.newSingleThreadExecutor` should be inside of `Future`, and I have 
updated it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] (
   with Logging {
   import SparkYarnApp._
 
+  appQueue.add(this)
+
   private var killed = false
-  private val appIdPromise: Promise[ApplicationId] = Promise()
   private[utils] var state: SparkApp.State = SparkApp.State.STARTING
   private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
+  private var appInfo = AppInfo()
 
 Review comment:
   @yiheng  it change at this line: 
https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R310


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348449088
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging {
 }
   }
 
+  val yarnAppMonitorThreadPool: ExecutorService =
+Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize)
+
+  for (i <- 0 until yarnAppMonitorThreadPoolSize) {
+yarnAppMonitorThreadPool.execute(new Runnable {
 
 Review comment:
   @jerryshao updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350700704
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -165,40 +231,24 @@ class SparkYarnApp private[utils] (
   /**
* Find the corresponding YARN application id from an application tag.
*
-   * @param appTag The application tag tagged on the target application.
-   *   If the tag is not unique, it returns the first application 
it found.
-   *   It will be converted to lower case to match YARN's 
behaviour.
* @return ApplicationId or the failure.
*/
-  @tailrec
-  private def getAppIdFromTag(
-  appTag: String,
-  pollInterval: Duration,
-  deadline: Deadline): ApplicationId = {
-if (isProcessErrExit()) {
-  throw new IllegalStateException("spark-submit start failed")
-}
-
-val appTagLowerCase = appTag.toLowerCase()
-
-// FIXME Should not loop thru all YARN applications but YarnClient doesn't 
offer an API.
-// Consider calling rmClient in YarnClient directly.
-
yarnClient.getApplications(appType).asScala.find(_.getApplicationTags.contains(appTagLowerCase))
-match {
-  case Some(app) => app.getApplicationId
-  case None =>
-if (deadline.isOverdue) {
-  process.foreach(_.destroy())
-  leakedAppTags.put(appTag, System.currentTimeMillis())
+  private def getAppId(): ApplicationId = {
 
 Review comment:
   @yiheng @jahstreet updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350701490
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -311,15 +322,22 @@ class SparkYarnApp private[utils] (
 }
   }
 
+  yarnTagToAppIdFailedTimes = 0
+
   debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
 } catch {
-  case _: InterruptedException =>
-yarnDiagnostics = ArrayBuffer("Session stopped by user.")
-changeState(SparkApp.State.KILLED)
+  // throw IllegalStateException when getAppId failed
+  case e: IllegalStateException =>
+yarnTagToAppIdFailedTimes += 1
+if (yarnTagToAppIdFailedTimes > 
getYarnTagToAppIdMaxFailedTimes(livyConf)) {
 
 Review comment:
   @yiheng updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348332474
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging {
 }
   }
 
+  val yarnAppMonitorThreadPool: ExecutorService =
+Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize)
+
+  for (i <- 0 until yarnAppMonitorThreadPoolSize) {
+yarnAppMonitorThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+var loop = true
+try {
+  while (loop) {
+val app = appQueue.poll()
+if (app != null) {
+  app.monitorSparkYarnApp()
+}
+
+if (app.isRunning) {
+  appQueue.add(app)
+}
+
+Thread.sleep(yarnPollInterval)
+  }
+} catch {
+  case _: InterruptedException => loop = false
 
 Review comment:
   I have changed, Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services