rezasafi commented on a change in pull request #23306: [SPARK-26357][Core]
Expose executors' procfs metrics to Metrics system
URL: https://github.com/apache/spark/pull/23306#discussion_r242295292
##########
File path:
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala
##########
@@ -195,45 +197,56 @@ private[spark] class ProcfsMetricsGetter(procfsDir:
String = "/proc/") extends L
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
allMetrics.copy(
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
- jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
+ jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem),
+ timeStamp = System.currentTimeMillis
)
}
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
allMetrics.copy(
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
- pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
+ pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem),
+ timeStamp = System.currentTimeMillis
)
}
else {
allMetrics.copy(
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
- otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
+ otherRSSTotal = allMetrics.otherRSSTotal + (rssMem),
+ timeStamp = System.currentTimeMillis
)
}
}
} catch {
case f: IOException =>
logWarning("There was a problem with reading" +
" the stat file of the process. ", f)
- ProcfsMetrics(0, 0, 0, 0, 0, 0)
+ ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
}
}
private[spark] def computeAllMetrics(): ProcfsMetrics = {
if (!isAvailable) {
- return ProcfsMetrics(0, 0, 0, 0, 0, 0)
+ return ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis)
}
- val pids = computeProcessTree
- var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0)
- for (p <- pids) {
- allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
- // if we had an error getting any of the metrics, we don't want to
report partial metrics, as
- // that would be misleading.
- if (!isAvailable) {
- return ProcfsMetrics(0, 0, 0, 0, 0, 0)
+ val lastMetricComputation = System.currentTimeMillis() -
cachedAllMetric.timeStamp
+ // Check whether we have computed the metrics in the past 1s
+ // ToDo: Should we make this configurable?
+ if(lastMetricComputation > Math.min(1000, HEARTBEAT_INTERVAL_MS)) {
Review comment:
Just to add more context about having 1000ms here. The pulling request for
Metrics system can't be less than 1 second. So user can configure the caching
period using heartbeat interval if they want to cache for less than 1 second.
The configuration option can let them to have a cache that is valid for more
than 1 second.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]