tgravescs commented on a change in pull request #31974:
URL: https://github.com/apache/spark/pull/31974#discussion_r607180363



##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -78,15 +78,19 @@ private[spark] class AppStatusListener(
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
   private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
+  private[spark] val liveMiscellaneousProcess = new HashMap[String, 
LiveMiscellaneousProcess]()
 
   private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
   // Keep the active executor count as a separate variable to avoid having to 
do synchronization
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
+  @volatile private var activeProcessCount = 0
 
   /** The last time when flushing `LiveEntity`s. This is to avoid flushing too 
frequently. */
   private var lastFlushTimeNs = System.nanoTime()
 
+  private[spark] lazy val yarnAMID = "yarn-am"

Review comment:
       put the process name in the event so its not hardcoding yarn specific in 
here.

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -78,15 +78,19 @@ private[spark] class AppStatusListener(
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
   private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
+  private[spark] val liveMiscellaneousProcess = new HashMap[String, 
LiveMiscellaneousProcess]()
 
   private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
   // Keep the active executor count as a separate variable to avoid having to 
do synchronization
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
+  @volatile private var activeProcessCount = 0

Review comment:
       I don't see this being used anywhere, remove

##########
File path: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
##########
@@ -227,6 +227,13 @@ case class SparkListenerUnschedulableTaskSetRemoved(
 @DeveloperApi
 case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) 
extends SparkListenerEvent
 
+@DeveloperApi
+case class MiscellaneousProcessInfoEvent(time: Long,
+  cores: Int,
+  memory: Long,
+  hostName: String,

Review comment:
       hostPort instead of hostName

##########
File path: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
##########
@@ -227,6 +227,13 @@ case class SparkListenerUnschedulableTaskSetRemoved(
 @DeveloperApi
 case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) 
extends SparkListenerEvent
 
+@DeveloperApi
+case class MiscellaneousProcessInfoEvent(time: Long,

Review comment:
       I would rather see this called MiscellaneousProcessAdded and then can we 
hide the details in another Class like MiscellaneousProcessInfo  (similar to 
ExecutorInfo) that way if we need to add other fields we can do it without 
modifying this event.  So have this have time, processId, and then 
MiscellaneousProcessInfo
   

##########
File path: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
##########
@@ -776,6 +781,13 @@ private[spark] class ApplicationMaster(
 
     override def onStart(): Unit = {
       driver.send(RegisterClusterManager(self))
+      // if deployment mode for yarn Application is client
+      // then send the AM Log Info to spark driver
+      if (!isClusterMode) {
+        val hostName = YarnContainerInfoHelper.getNodeManagerHttpAddress(None)

Review comment:
       this is actually host and port right?  if so lets rename variable to 
match

##########
File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
##########
@@ -107,6 +111,8 @@ private[spark] class AppStatusListener(
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
     case SparkListenerLogStart(version) => sparkVersion = version
+    case MiscellaneousProcessInfoEvent(time, cores, memory, hostName, urlInfo) 
=>
+      updateAMInfoInLiveProcess(time, cores, memory, hostName, urlInfo.toMap)

Review comment:
       rename to be something like updateMiscProcessInfo.
   
   Also any Reason you didn't create an onMiscellaneousProcessAdded type 
function?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to