[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350701490
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -311,15 +322,22 @@ class SparkYarnApp private[utils] (
 }
   }
 
+  yarnTagToAppIdFailedTimes = 0
+
   debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
 } catch {
-  case _: InterruptedException =>
-yarnDiagnostics = ArrayBuffer("Session stopped by user.")
-changeState(SparkApp.State.KILLED)
+  // throw IllegalStateException when getAppId failed
+  case e: IllegalStateException =>
+yarnTagToAppIdFailedTimes += 1
+if (yarnTagToAppIdFailedTimes > 
getYarnTagToAppIdMaxFailedTimes(livyConf)) {
 
 Review comment:
   @yiheng updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348332474
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging {
 }
   }
 
+  val yarnAppMonitorThreadPool: ExecutorService =
+Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize)
+
+  for (i <- 0 until yarnAppMonitorThreadPoolSize) {
+yarnAppMonitorThreadPool.execute(new Runnable {
+  override def run(): Unit = {
+var loop = true
+try {
+  while (loop) {
+val app = appQueue.poll()
+if (app != null) {
+  app.monitorSparkYarnApp()
+}
+
+if (app.isRunning) {
+  appQueue.add(app)
+}
+
+Thread.sleep(yarnPollInterval)
+  }
+} catch {
+  case _: InterruptedException => loop = false
 
 Review comment:
   I have changed, Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r347187839
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -98,7 +116,35 @@ object SparkYarnApp extends Logging {
 }
   }
 
-
+  private val yarnAppMonitorThread = new Thread() {
+override def run() : Unit = {
+  val executor = Executors.newSingleThreadExecutor
+  while (true) {
+for ((app, appTag) <- appMap) {
+  Future {
 
 Review comment:
   @jerryshao The mainly purpose of executor inside the future, is that I 
should control the run time of every thread in thread pool in case the rpc 
block. Once the rpc block happens, the thread can be canceled by  
`handler.cancel(true)`. Additionally,  `val executor = 
Executors.newSingleThreadExecutor` should be inside of `Future`, and I have 
updated it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] (
   with Logging {
   import SparkYarnApp._
 
+  appQueue.add(this)
+
   private var killed = false
-  private val appIdPromise: Promise[ApplicationId] = Promise()
   private[utils] var state: SparkApp.State = SparkApp.State.STARTING
   private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
+  private var appInfo = AppInfo()
 
 Review comment:
   @yiheng  it change at this line: 
https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R310


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r348449088
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -98,7 +118,36 @@ object SparkYarnApp extends Logging {
 }
   }
 
+  val yarnAppMonitorThreadPool: ExecutorService =
+Executors.newFixedThreadPool(yarnAppMonitorThreadPoolSize)
+
+  for (i <- 0 until yarnAppMonitorThreadPoolSize) {
+yarnAppMonitorThreadPool.execute(new Runnable {
 
 Review comment:
   @jerryshao updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350700704
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -165,40 +231,24 @@ class SparkYarnApp private[utils] (
   /**
* Find the corresponding YARN application id from an application tag.
*
-   * @param appTag The application tag tagged on the target application.
-   *   If the tag is not unique, it returns the first application 
it found.
-   *   It will be converted to lower case to match YARN's 
behaviour.
* @return ApplicationId or the failure.
*/
-  @tailrec
-  private def getAppIdFromTag(
-  appTag: String,
-  pollInterval: Duration,
-  deadline: Deadline): ApplicationId = {
-if (isProcessErrExit()) {
-  throw new IllegalStateException("spark-submit start failed")
-}
-
-val appTagLowerCase = appTag.toLowerCase()
-
-// FIXME Should not loop thru all YARN applications but YarnClient doesn't 
offer an API.
-// Consider calling rmClient in YarnClient directly.
-
yarnClient.getApplications(appType).asScala.find(_.getApplicationTags.contains(appTagLowerCase))
-match {
-  case Some(app) => app.getApplicationId
-  case None =>
-if (deadline.isOverdue) {
-  process.foreach(_.destroy())
-  leakedAppTags.put(appTag, System.currentTimeMillis())
+  private def getAppId(): ApplicationId = {
 
 Review comment:
   @yiheng @jahstreet updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] Limmen commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending

2019-11-26 Thread GitBox
Limmen commented on a change in pull request #165: [LIVY-581] Fix edge-case 
where livy overrides user-provided spark properties instead of appending
URL: https://github.com/apache/incubator-livy/pull/165#discussion_r350850028
 
 

 ##
 File path: 
server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 ##
 @@ -286,16 +286,33 @@ object InteractiveSession extends Logging {
 }
 
 def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = {
-  val sparkFiles = 
conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String])
-  hiveSiteFile(sparkFiles, livyConf) match {
+  val yarnFiles = conf.get(LivyConf.SPARK_YARN_DIST_FILES)
+  .map(_.split(",")).getOrElse(Array.empty[String])
+  val sparkFiles = conf.get(LivyConf.SPARK_FILES)
+   .map(_.split(",")).getOrElse(Array.empty[String])
+  var files = Array.empty[String]
+  if (!sparkFiles.isEmpty || yarnFiles.isEmpty) files = sparkFiles else 
files = yarnFiles
+  hiveSiteFile(files, livyConf) match {
 case (_, true) =>
   debug("Enable HiveContext because hive-site.xml is found in user 
request.")
-  mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), 
LivyConf.SPARK_JARS)
+  if (conf.get(LivyConf.SPARK_JARS) != None || 
conf.get(LivyConf.SPARK_YARN_JARS) == None) {
 
 Review comment:
   ping @yiheng 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver

2019-11-26 Thread GitBox
mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to 
the end user using thriftserver
URL: https://github.com/apache/incubator-livy/pull/238#issuecomment-558720635
 
 
   @huianyi I don't see the previous comments being resolved. You're still 
keeping logs on server side with a timer instead of giving the progress on 
client's request.
   
   So my point is: Hive handles this in a different way. Please see 
https://github.com/apache/hive/blob/209096a71b25a3d75dca1930f825c8c10b99d2b9/service/src/java/org/apache/hive/service/cli/CLIService.java#L482.
   
   I think we should try and handle it that way if possible. If it is not 
possible, we need to clarify why it is not and why we have no other choices 
than this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending

2019-11-26 Thread GitBox
yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case 
where livy overrides user-provided spark properties instead of appending
URL: https://github.com/apache/incubator-livy/pull/165#discussion_r351088916
 
 

 ##
 File path: 
server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 ##
 @@ -286,16 +286,33 @@ object InteractiveSession extends Logging {
 }
 
 def mergeHiveSiteAndHiveDeps(sparkMajorVersion: Int): Unit = {
-  val sparkFiles = 
conf.get("spark.files").map(_.split(",")).getOrElse(Array.empty[String])
-  hiveSiteFile(sparkFiles, livyConf) match {
+  val yarnFiles = conf.get(LivyConf.SPARK_YARN_DIST_FILES)
+  .map(_.split(",")).getOrElse(Array.empty[String])
+  val sparkFiles = conf.get(LivyConf.SPARK_FILES)
+   .map(_.split(",")).getOrElse(Array.empty[String])
+  var files = Array.empty[String]
+  if (!sparkFiles.isEmpty || yarnFiles.isEmpty) files = sparkFiles else 
files = yarnFiles
+  hiveSiteFile(files, livyConf) match {
 case (_, true) =>
   debug("Enable HiveContext because hive-site.xml is found in user 
request.")
-  mergeConfList(datanucleusJars(livyConf, sparkMajorVersion), 
LivyConf.SPARK_JARS)
+  if (conf.get(LivyConf.SPARK_JARS) != None || 
conf.get(LivyConf.SPARK_YARN_JARS) == None) {
 
 Review comment:
   @Limmen Sorry for missing the message.
   > The behavior is that if both spark.jars and spark.yarn.jars are set, spark 
ignores the yarn-property.
   
   If it's spark behavior, it's fine to deal with it like this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
runzhiwang commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r350674099
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -122,10 +162,13 @@ class SparkYarnApp private[utils] (
   with Logging {
   import SparkYarnApp._
 
+  appQueue.add(this)
+
   private var killed = false
-  private val appIdPromise: Promise[ApplicationId] = Promise()
   private[utils] var state: SparkApp.State = SparkApp.State.STARTING
   private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
+  private var appInfo = AppInfo()
 
 Review comment:
   @yiheng  it change at this line: 
https://github.com/apache/incubator-livy/pull/242/files#diff-e0ea718a9f6261f348cba9e32f25da07R321


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r351092771
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -254,50 +268,45 @@ class SparkYarnApp private[utils] (
 }
   }
 
-  // Exposed for unit test.
-  // TODO Instead of spawning a thread for every session, create a centralized 
thread and
-  // batch YARN queries.
-  private[utils] val yarnAppMonitorThread = 
Utils.startDaemonThread(s"yarnAppMonitorThread-$this") {
+  private var yarnTagToAppIdFailedTimes: Int = _
+
+  private def failToMonitor(): Unit = {
+changeState(SparkApp.State.FAILED)
+process.foreach(_.destroy())
+leakedAppTags.put(appTag, System.currentTimeMillis())
+  }
+
+  private def monitorSparkYarnApp(): Unit = {
 try {
+  if (killed) {
+changeState(SparkApp.State.KILLED)
+  } else if (isProcessErrExit()) {
+changeState(SparkApp.State.FAILED)
+  }
   // If appId is not known, query YARN by appTag to get it.
-  val appId = try {
-appIdOption.map(ConverterUtils.toApplicationId).getOrElse {
-  val pollInterval = getYarnPollInterval(livyConf)
-  val deadline = getYarnTagToAppIdTimeout(livyConf).fromNow
-  getAppIdFromTag(appTag, pollInterval, deadline)
+  if (appId.isEmpty) {
+appId = getAppId()
+if (appId.isEmpty) {
+  throw new IllegalStateException(s"No YARN application is found with 
tag " +
 
 Review comment:
   instead of throwing an exception, can we move the exception handle code here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case where livy overrides user-provided spark properties instead of appending

2019-11-26 Thread GitBox
yiheng commented on a change in pull request #165: [LIVY-581] Fix edge-case 
where livy overrides user-provided spark properties instead of appending
URL: https://github.com/apache/incubator-livy/pull/165#discussion_r351091430
 
 

 ##
 File path: 
server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 ##
 @@ -332,7 +348,11 @@ object InteractiveSession extends Logging {
   
LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION))
 val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION)
 
-mergeConfList(livyJars(livyConf, scalaVersion), LivyConf.SPARK_JARS)
+if (conf.get(LivyConf.SPARK_JARS) != None || conf.get(LivyConf.SPARK_JARS) 
== None){
 
 Review comment:
   For the spark file conf
   ```scala
   val (files, mergeFileConf) = if (!sparkFiles.isEmpty || yarnFiles.isEmpty) {
 (sparkFiles, LivyConf.SPARK_FILES)
   } else {
 (yarnFiles, LivyConf.SPARK_YARN_DIST_FILES)
   }
   ...
   mergeConfList(List(file.getAbsolutePath), mergeFileConf)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn

2019-11-26 Thread GitBox
yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy 
should not spawn one thread per job to track the job on Yarn
URL: https://github.com/apache/incubator-livy/pull/242#discussion_r351093478
 
 

 ##
 File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
 ##
 @@ -324,15 +333,22 @@ class SparkYarnApp private[utils] (
 }
   }
 
+  yarnTagToAppIdFailedTimes = 0
+
   debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
 } catch {
-  case _: InterruptedException =>
-yarnDiagnostics = ArrayBuffer("Session stopped by user.")
-changeState(SparkApp.State.KILLED)
+  // throw IllegalStateException when getAppId failed
 
 Review comment:
   why remove the interruptedException handle code?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] huianyi commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver

2019-11-26 Thread GitBox
huianyi commented on issue #238: [LIVY-689] Deliver stage process message to 
the end user using thriftserver
URL: https://github.com/apache/incubator-livy/pull/238#issuecomment-558899022
 
 
   @mgaido91 Have I missed something? If we use beeline to connect to 
hiveserver, hiveserver will catch each job status from AppMaster and save these 
messages in operation logs, which are files located in the hiveserver‘s 
machine. Clients will pull these messages in a while loop.
   
   At first, I achieve this by using push based way, RscDriver will push its 
job status to thriftserver, and In the current implement, our thriftserver will 
pull these job status from RscDriver and stored it in queue(OperationMessages). 
Beeline client also will pull these messages in a while loop.
   
   Please correct me if I'm wrong, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-livy] mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to the end user using thriftserver

2019-11-26 Thread GitBox
mgaido91 commented on issue #238: [LIVY-689] Deliver stage process message to 
the end user using thriftserver
URL: https://github.com/apache/incubator-livy/pull/238#issuecomment-558972949
 
 
   @huianyi I think what we would like to achieve here is: do not use the 
operation logs, so that we don't have to keep anything on server side (by this 
I mean the Livy Server) (and hence avoid to put memory pressure on it).
   
   The ideal solution would be:
- the beeline client polls for the status;
- Livy server queries the spark session for the query status:
- Livy server forwards back to the beeline client the status.
   
   In this way, we need to store nothing on Livy server memory.
   
   This seems to me to be the approach used in Hive. My understanding is the 
Hive informs the beeline client about the status and progress of the operation 
with the code I pointed you out in the previous link. So I think we should 
follow that approach to give feedbacks on the query status/progress.
   
   Hope this is more clear. If not, please ask what it is not. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services