rezasafi commented on a change in pull request #23306: [SPARK-26357][Core] 
Expose executors' procfs metrics to Metrics system
URL: https://github.com/apache/spark/pull/23306#discussion_r242354662
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
 ##########
 @@ -195,45 +197,56 @@ private[spark] class ProcfsMetricsGetter(procfsDir: 
String = "/proc/") extends L
         if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
           allMetrics.copy(
             jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
-            jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
+            jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem),
+            timeStamp = System.currentTimeMillis
           )
         }
         else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
           allMetrics.copy(
             pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
-            pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
+            pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem),
+            timeStamp = System.currentTimeMillis
           )
         }
         else {
           allMetrics.copy(
             otherVmemTotal = allMetrics.otherVmemTotal + vmem,
-            otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
+            otherRSSTotal = allMetrics.otherRSSTotal + (rssMem),
+            timeStamp = System.currentTimeMillis
           )
         }
       }
     } catch {
       case f: IOException =>
         logWarning("There was a problem with reading" +
           " the stat file of the process. ", f)
-        ProcfsMetrics(0, 0, 0, 0, 0, 0)
+        ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
     }
   }
 
   private[spark] def computeAllMetrics(): ProcfsMetrics = {
     if (!isAvailable) {
-      return ProcfsMetrics(0, 0, 0, 0, 0, 0)
+      return ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
     }
-    val pids = computeProcessTree
-    var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0)
-    for (p <- pids) {
-      allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
-      // if we had an error getting any of the metrics, we don't want to 
report partial metrics, as
-      // that would be misleading.
-      if (!isAvailable) {
-        return ProcfsMetrics(0, 0, 0, 0, 0, 0)
+    val lastMetricComputation = System.currentTimeMillis() - 
cachedAllMetric.timeStamp
+    // Check whether we have computed the metrics in the past 1s
+    // ToDo: Should we make this configurable?
+    if(lastMetricComputation > Math.min(1000, HEARTBEAT_INTERVAL_MS)) {
 
 Review comment:
   I want to do some testing before adding a synchronized method. Metrics 
doesn't seems to be that critical and a benign race that removing it will cause 
bad performance isn't a good idea. As a last solution I think defining this 
method as synchronized is ok.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to