[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350701490 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -311,15 +322,22 @@ class SparkYarnApp private[utils] ( } } + yarnTagToAppIdFailedTimes = 0 + debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}") } catch { - case _: InterruptedException => -yarnDiagnostics = ArrayBuffer("Session stopped by user.") -changeState(SparkApp.State.KILLED) + // throw IllegalStateException when getAppId failed + case e: IllegalStateException => +yarnTagToAppIdFailedTimes += 1 +if (yarnTagToAppIdFailedTimes > getYarnTagToAppIdMaxFailedTimes(livyConf)) { Review comment: @yiheng updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348332474 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging { } } + val yarnAppMonitorThreadPool: ExecutorService = +Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize) + + for (i <- 0 until yarnAppMonitorThreadPoolSize) { +yarnAppMonitorThreadPool.execute(new Runnable { + override def run(): Unit = { +var loop = true +try { + while (loop) { +val app = appQueue.poll() +if (app != null) { + app.monitorSparkYarnApp() +} + +if (app.isRunning) { + appQueue.add(app) +} + +Thread.sleep(yarnPollInterval) + } +} catch { + case _: InterruptedException => loop = false Review comment: I have changed, Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r347187839 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -98,7 +116,35 @@ object SparkYarnApp extends Logging { } } - + private val yarnAppMonitorThread = new Thread() { +override def run() : Unit = { + val executor = Executors.newSingleThreadExecutor + while (true) { +for ((app, appTag) <- appMap) { + Future { Review comment: @jerryshao The mainly purpose of executor inside the future, is that I should control the run time of every thread in thread pool in case the rpc block. Once the rpc block happens, the thread can be canceled by `handler.cancel(true)`. Additionally, `val executor = Executors.newSingleThreadExecutor` should be inside of `Future`, and I have updated it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] ( with Logging { import SparkYarnApp._ + appQueue.add(this) + private var killed = false - private val appIdPromise: Promise[ApplicationId] = Promise() private[utils] var state: SparkApp.State = SparkApp.State.STARTING private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + private var appInfo = AppInfo() Review comment: @yiheng it change at this line: https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R310 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348449088 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging { } } + val yarnAppMonitorThreadPool: ExecutorService = +Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize) + + for (i <- 0 until yarnAppMonitorThreadPoolSize) { +yarnAppMonitorThreadPool.execute(new Runnable { Review comment: @jerryshao updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350700704 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -165,40 +231,24 @@ class SparkYarnApp private[utils] ( /** * Find the corresponding YARN application id from an application tag. * - * @param appTag The application tag tagged on the target application. - * If the tag is not unique, it returns the first application it found. - * It will be converted to lower case to match YARN's behaviour. * @return ApplicationId or the failure. */ - @tailrec - private def getAppIdFromTag( - appTag: String, - pollInterval: Duration, - deadline: Deadline): ApplicationId = { -if (isProcessErrExit()) { - throw new IllegalStateException("spark-submit start failed") -} - -val appTagLowerCase = appTag.toLowerCase() - -// FIXME Should not loop thru all YARN applications but YarnClient doesn't offer an API. -// Consider calling rmClient in YarnClient directly. - yarnClient.getApplications(appType).asScala.find(_.getApplicationTags.contains(appTagLowerCase)) -match { - case Some(app) => app.getApplicationId - case None => -if (deadline.isOverdue) { - process.foreach(_.destroy()) - leakedAppTags.put(appTag, System.currentTimeMillis()) + private def getAppId(): ApplicationId = { Review comment: @yiheng @jahstreet updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] Limmen commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending
Limmen commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending URL: https://github.com/apache/incubator-livy/pull/165#discussion_r350850028 ## File path: server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala ## @@ -286,16 +286,33 @@ object InteractiveSession extends Logging { } def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = { - val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String]) - hiveSiteFile(sparkFiles, livyConf) match { + val yarnFiles = conf.get(LivyConf.SPARK_YARN_DIST_FILES) + .map(_.split(",")).getOrElse(Array.empty[String]) + val sparkFiles = conf.get(LivyConf.SPARK_FILES) + .map(_.split(",")).getOrElse(Array.empty[String]) + var files = Array.empty[String] + if (!sparkFiles.isEmpty || yarnFiles.isEmpty) files = sparkFiles else files = yarnFiles + hiveSiteFile(files, livyConf) match { case (_, true) => debug("Enable HiveContext because hive-site.xml is found in user request.") - mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS) + if (conf.get(LivyConf.SPARK_JARS) != None || conf.get(LivyConf.SPARK_YARN_JARS) == None) { Review comment: ping @yiheng This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver
mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver URL: https://github.com/apache/incubator-livy/pull/238#issuecomment-558720635 @huianyi I don't see the previous comments being resolved. You're still keeping logs on server side with a timer instead of giving the progress on client's request. So my point is: Hive handles this in a different way. Please see https://github.com/apache/hive/blob/209096a71b25a3d75dca1930f825c8c10b99d2b9/service/src/java/org/apache/hive/service/cli/CLIService.java#L482. I think we should try and handle it that way if possible. If it is not possible, we need to clarify why it is not and why we have no other choices than this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending
yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending URL: https://github.com/apache/incubator-livy/pull/165#discussion_r351088916 ## File path: server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala ## @@ -286,16 +286,33 @@ object InteractiveSession extends Logging { } def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = { - val sparkFiles = conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String]) - hiveSiteFile(sparkFiles, livyConf) match { + val yarnFiles = conf.get(LivyConf.SPARK_YARN_DIST_FILES) + .map(_.split(",")).getOrElse(Array.empty[String]) + val sparkFiles = conf.get(LivyConf.SPARK_FILES) + .map(_.split(",")).getOrElse(Array.empty[String]) + var files = Array.empty[String] + if (!sparkFiles.isEmpty || yarnFiles.isEmpty) files = sparkFiles else files = yarnFiles + hiveSiteFile(files, livyConf) match { case (_, true) => debug("Enable HiveContext because hive-site.xml is found in user request.") - mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), LivyConf.SPARK_JARS) + if (conf.get(LivyConf.SPARK_JARS) != None || conf.get(LivyConf.SPARK_YARN_JARS) == None) { Review comment: @Limmen Sorry for missing the message. > The behavior is that if both spark.jars and spark.yarn.jars are set, spark ignores the yarn-property. If it's spark behavior, it's fine to deal with it like this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] ( with Logging { import SparkYarnApp._ + appQueue.add(this) + private var killed = false - private val appIdPromise: Promise[ApplicationId] = Promise() private[utils] var state: SparkApp.State = SparkApp.State.STARTING private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] + private var appInfo = AppInfo() Review comment: @yiheng it change at this line: https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R321 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r351092771 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -254,50 +268,45 @@ class SparkYarnApp private[utils] ( } } - // Exposed for unit test. - // TODO Instead of spawning a thread for every session, create a centralized thread and - // batch YARN queries. - private[utils] val yarnAppMonitorThread = Utils.startDaemonThread(s"yarnAppMonitorThread-$this") { + private var yarnTagToAppIdFailedTimes: Int = _ + + private def failToMonitor(): Unit = { +changeState(SparkApp.State.FAILED) +process.foreach(_.destroy()) +leakedAppTags.put(appTag, System.currentTimeMillis()) + } + + private def monitorSparkYarnApp(): Unit = { try { + if (killed) { +changeState(SparkApp.State.KILLED) + } else if (isProcessErrExit()) { +changeState(SparkApp.State.FAILED) + } // If appId is not known, query YARN by appTag to get it. - val appId = try { -appIdOption.map(ConverterUtils.toApplicationId).getOrElse { - val pollInterval = getYarnPollInterval(livyConf) - val deadline = getYarnTagToAppIdTimeout(livyConf).fromNow - getAppIdFromTag(appTag, pollInterval, deadline) + if (appId.isEmpty) { +appId = getAppId() +if (appId.isEmpty) { + throw new IllegalStateException(s"No YARN application is found with tag " + Review comment: instead of throwing an exception, can we move the exception handle code here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending
yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending URL: https://github.com/apache/incubator-livy/pull/165#discussion_r351091430 ## File path: server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala ## @@ -332,7 +348,11 @@ object InteractiveSession extends Logging { LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION)) val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) -mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS) +if (conf.get(LivyConf.SPARK_JARS) != None || conf.get(LivyConf.SPARK_JARS) == None){ Review comment: For the spark file conf ```scala val (files, mergeFileConf) = if (!sparkFiles.isEmpty || yarnFiles.isEmpty) { (sparkFiles, LivyConf.SPARK_FILES) } else { (yarnFiles, LivyConf.SPARK_YARN_DIST_FILES) } ... mergeConfList(List(file.getAbsolutePath), mergeFileConf) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn
yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r351093478 ## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ## @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] ( } } + yarnTagToAppIdFailedTimes = 0 + debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}") } catch { - case _: InterruptedException => -yarnDiagnostics = ArrayBuffer("Session stopped by user.") -changeState(SparkApp.State.KILLED) + // throw IllegalStateException when getAppId failed Review comment: why remove the interruptedException handle code? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] huianyi commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver
huianyi commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver URL: https://github.com/apache/incubator-livy/pull/238#issuecomment-558899022 @mgaido91 Have I missed something? If we use beeline to connect to hiveserver, hiveserver will catch each job status from AppMaster and save these messages in operation logs, which are files located in the hiveserver‘s machine. Clients will pull these messages in a while loop. At first, I achieve this by using push based way, RscDriver will push its job status to thriftserver, and In the current implement, our thriftserver will pull these job status from RscDriver and stored it in queue(OperationMessages). Beeline client also will pull these messages in a while loop. Please correct me if I'm wrong, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-livy] mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver
mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver URL: https://github.com/apache/incubator-livy/pull/238#issuecomment-558972949 @huianyi I think what we would like to achieve here is: do not use the operation logs, so that we don't have to keep anything on server side (by this I mean the Livy Server) (and hence avoid to put memory pressure on it). The ideal solution would be: - the beeline client polls for the status; - Livy server queries the spark session for the query status: - Livy server forwards back to the beeline client the status. In this way, we need to store nothing on Livy server memory. This seems to me to be the approach used in Hive. My understanding is the Hive informs the beeline client about the status and progress of the operation with the code I pointed you out in the previous link. So I think we should follow that approach to give feedbacks on the query status/progress. Hope this is more clear. If not, please ask what it is not. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services