attilapiros commented on a change in pull request #28094:
URL: https://github.com/apache/spark/pull/28094#discussion_r416618745



##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -197,10 +217,16 @@ private[spark] class AppStatusListener(
     exec.host = event.executorInfo.executorHost
     exec.isActive = true
     exec.totalCores = event.executorInfo.totalCores
-    exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+    val rpId = event.executorInfo.resourceProfileId
+    val liveRP = liveResourceProfiles.get(rpId)
+    val cpusPerTask = liveRP.map(_.taskResources.get(CPUS)
+      
.map(_.amount.toInt).getOrElse(defaultCoresPerTask)).getOrElse(defaultCoresPerTask)

Review comment:
       What about flatMap and saving the first 
`.getOrElse(defaultCoresPerTask)` ?
   
   ```scala
    val cpusPerTask = liveRP.flatMap(_.taskResources.get(CPUS)
          .map(_.amount.toInt)).getOrElse(defaultCoresPerTask)
   ```

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
##########
@@ -44,6 +45,12 @@ private[spark] class AppStatusStore(
     store.read(klass, klass.getName()).info
   }
 
+  def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = {
+    val klass = classOf[ResourceProfileWrapper]

Review comment:
       This line is not needed.

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
##########
@@ -44,6 +45,12 @@ private[spark] class AppStatusStore(
     store.read(klass, klass.getName()).info
   }
 
+  def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = {
+    val klass = classOf[ResourceProfileWrapper]
+    val it = store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo)

Review comment:
       Nit: why  the `it` introduced as a `val` i mean the last line can be:
   ```scala
   store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq
   ```

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -145,6 +147,23 @@ private[spark] class AppStatusListener(
     }
   }
 
+  override def onResourceProfileAdded(event: 
SparkListenerResourceProfileAdded): Unit = {
+    val liveRP = new LiveResourceProfile(event.resourceProfile.id)
+    liveResourceProfiles(event.resourceProfile.id) = liveRP
+    liveRP.taskResources = event.resourceProfile.taskResources
+    liveRP.executorResources = event.resourceProfile.executorResources
+    val maxTasks = event.resourceProfile.maxTasksPerExecutor(conf)
+    liveRP.maxTasksPerExecutor = if (event.resourceProfile.isCoresLimitKnown) {
+      Some(maxTasks)
+    } else {
+      None
+    }
+    val rpInfo = new v1.ResourceProfileInfo(liveRP.resourceProfileId,
+      liveRP.executorResources, liveRP.taskResources)
+    logWarning("Resource Profile added id " + liveRP.resourceProfileId)

Review comment:
       This logging probably left here from a debugging session.

##########
File path: core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
##########
@@ -38,6 +40,34 @@ private[ui] class EnvironmentPage(
       "Java Home" -> appEnv.runtime.javaHome,
       "Scala Version" -> appEnv.runtime.scalaVersion)
 
+    def constructExecutorRequestString(ereqs: Map[String, 
ExecutorResourceRequest]): String = {

Review comment:
       Nit: missing camel case in `ereqs`. What about `execReqs`? 

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -159,10 +178,11 @@ private[spark] class AppStatusListener(
       details.getOrElse("Spark Properties", Nil),
       details.getOrElse("Hadoop Properties", Nil),
       details.getOrElse("System Properties", Nil),
-      details.getOrElse("Classpath Entries", Nil))
+      details.getOrElse("Classpath Entries", Nil),
+      Nil)
 
-    coresPerTask = 
envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
-      .getOrElse(coresPerTask)
+    defaultCoresPerTask = 
envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)

Review comment:
       Nit: Sometimes the code talks about cores and sometimes about CPUs and 
within this line we have a conversion between the two. Is our terminology 
correct?  

##########
File path: core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
##########
@@ -38,6 +40,34 @@ private[ui] class EnvironmentPage(
       "Java Home" -> appEnv.runtime.javaHome,
       "Scala Version" -> appEnv.runtime.scalaVersion)
 
+    def constructExecutorRequestString(ereqs: Map[String, 
ExecutorResourceRequest]): String = {
+      ereqs.map {
+        case (_, ereq) =>
+          val execStr = new mutable.StringBuilder()
+          execStr ++= s"\t${ereq.resourceName}: [amount: ${ereq.amount}"
+          if (ereq.discoveryScript.nonEmpty) execStr ++= s", discovery: 
${ereq.discoveryScript}"

Review comment:
       Nit: use `{` and ` }`
   https://github.com/databricks/scala-style-guide#curly
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to