EBernhardson has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/395915 )
Change subject: [DNM] more debugging of RssFile explosion
......................................................................
[DNM] more debugging of RssFile explosion
Change-Id: Ia5cdb36f31dd4512b048de74f2b2d769d2fa7acf
---
M
jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
M
jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
2 files changed, 117 insertions(+), 8 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/search/xgboost
refs/changes/15/395915/1
diff --git
a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
index 22ed87c..e8f55b4 100644
---
a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
+++
b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
@@ -16,13 +16,15 @@
package ml.dmlc.xgboost4j.scala.spark
+import java.io.{BufferedReader, File, FileReader}
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.regex.Pattern
import org.apache.commons.logging.LogFactory
import scala.concurrent.duration.Duration
-import scala.io.Source
+import scala.io.{BufferedSource, Source}
class ResourceMonitorThread(reportEvery: Duration) extends Thread {
super.setDaemon(true)
@@ -37,18 +39,120 @@
return
}
while (keepChecking.get()) {
- report()
+ report().foreach(logger.info)
Thread.sleep(reportEvery.toMillis)
}
}
def stopChecking(): Unit = keepChecking.set(false)
- private def report(): Unit = {
+ def report(): Seq[String] = {
val rss = Source.fromFile(s"/proc/$pid/status").getLines()
.filter(_.startsWith("Rss"))
.mkString(", ")
- logger.info(rss)
- logger.info(memoryBean.getHeapMemoryUsage)
+ Seq(rss,
+ memoryBean.getHeapMemoryUsage.toString,
+ // 5 largest contributors to RSSFile
+ collectSMapInfo().take(5).map({ info =>
+ s"${info.mem()}: ${info.name}"
+ }).mkString("\n")
+ ).filter(_.length > 0)
}
+
+ private val ADDRESS_PATTERN =
raw"^([a-f0-9]*)-([a-f0-9]*)(\s)*([rxwps\-]*).*".r
+ private val MEM_INFO_PATTERN = raw"^([A-Z].*):[\s ]*(.*).*".r
+ private val KB = "kB"
+ private val READ_ONLY_WITH_SHARED_PERMISSION = "r--s"
+ private val READ_EXECUTE_WITH_SHARED_PERMISSION = "r-xs"
+
+ val file = new File(s"/proc/$pid/smaps")
+
+ private def collectSMapInfo(): List[ProcessSmapMemoryInfo] = {
+ if (!file.exists()) {
+ return Nil
+ }
+ val lines = Source.fromFile(s"/proc/$pid/smaps").getLines()
+ lines.map(_.trim).foldLeft(List[ProcessSmapMemoryInfo]()) { (acc, line) =>
+ line match {
+ case ADDRESS_PATTERN(startAddr, endAddr, space, permission) =>
+ new ProcessSmapMemoryInfo(line, permission) :: acc
+ case MEM_INFO_PATTERN(key, value) =>
+ acc match {
+ case memInfo :: xs => memInfo.setMemInfo(key.trim,
value.replace(KB, "").trim) :: xs
+ case Nil => Nil
+ }
+ case _ => acc
+ }
+ }.filter { memInfo =>
+
!memInfo.permission.trim.equalsIgnoreCase(READ_ONLY_WITH_SHARED_PERMISSION) &&
+
!memInfo.permission.trim.equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)
+ }.sortBy(_.mem()).reverse
+ }
+}
+
+class ProcessSmapMemoryInfo(val name: String, val permission: String) {
+ var size: Int = 0
+ var rss: Int = 0
+ var pss: Int = 0
+ var sharedClean: Int = 0
+ var sharedDirty: Int = 0
+ var privateClean: Int = 0
+ var privateDirty: Int = 0
+ var referenced: Int = 0
+ var regionName: String = ""
+
+ def setMemInfo(key: String, value: String): ProcessSmapMemoryInfo = {
+ try {
+ val intval = value.trim.toInt
+ MemInfo(key) match {
+ case MemInfo.SIZE => size = intval
+ case MemInfo.RSS => rss = intval
+ case MemInfo.PSS => pss = intval
+ case MemInfo.SHARED_CLEAN => sharedClean = intval
+ case MemInfo.SHARED_DIRTY => sharedDirty = intval
+ case MemInfo.PRIVATE_CLEAN => privateClean = intval
+ case MemInfo.PRIVATE_DIRTY => privateDirty = intval
+ case MemInfo.REFERENCED => referenced = intval
+ case _ => None
+ }
+ } catch {
+ case e: NumberFormatException => Nil
+ }
+ this
+ }
+
+ def mem(): Int = {
+ // Math.min(sharedDirty, pss) + privateDirty + privateClean
+ rss
+ }
+}
+
+object MemInfo {
+ sealed abstract class MemInfoVal(val name: String) {
+ override def toString: String = name
+ }
+
+ def apply(name: String): MemInfoVal = {
+ values.collectFirst { case i if i.name.equalsIgnoreCase(name.trim) => i
}.getOrElse(INVALID)
+ }
+
+ val values: Seq[MemInfoVal] = Seq(
+ SIZE, RSS, PSS, SHARED_CLEAN, SHARED_DIRTY, PRIVATE_CLEAN, PRIVATE_DIRTY,
+ REFERENCED, ANONYMOUS, ANON_HUGE_PAGES, SWAP, KERNEL_PAGE_SIZE,
MMU_PAGE_SIZE)
+
+
+ case object SIZE extends MemInfoVal("Size")
+ case object RSS extends MemInfoVal("Rss")
+ case object PSS extends MemInfoVal("Pss")
+ case object SHARED_CLEAN extends MemInfoVal("Shared_Clean")
+ case object SHARED_DIRTY extends MemInfoVal("Shared_Dirty")
+ case object PRIVATE_CLEAN extends MemInfoVal("Private_Clean")
+ case object PRIVATE_DIRTY extends MemInfoVal("Private_Dirty")
+ case object REFERENCED extends MemInfoVal("Referenced")
+ case object ANONYMOUS extends MemInfoVal("Anonymous")
+ case object ANON_HUGE_PAGES extends MemInfoVal("Anon_Huge_Pages")
+ case object SWAP extends MemInfoVal("swap")
+ case object KERNEL_PAGE_SIZE extends MemInfoVal("kernelPageSize")
+ case object MMU_PAGE_SIZE extends MemInfoVal("mmuPageSize")
+ case object INVALID extends MemInfoVal("invalid")
}
diff --git
a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
index cf9aa4c..b8b0c1b 100644
---
a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
+++
b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
@@ -17,19 +17,19 @@
package ml.dmlc.xgboost4j.scala.spark
import java.nio.file.Files
-import java.util.concurrent.LinkedBlockingDeque
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
import scala.util.Random
-
import ml.dmlc.xgboost4j.java.Rabit
import ml.dmlc.xgboost4j.scala.DMatrix
import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
-
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.{DenseVector, Vectors, Vector => SparkVector}
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
+
+import scala.concurrent.duration.Duration
class XGBoostGeneralSuite extends FunSuite with PerTest {
test("test Rabit allreduce to validate Scala-implemented Rabit tracker") {
@@ -391,4 +391,9 @@
assert(eval.eval(xgBoostModel.booster.predict(testSetDMatrix, outPutMargin
= true),
testSetDMatrix) < 0.1)
}
+
+ test("resource monitor") {
+ val rm = new ResourceMonitorThread(Duration(10, TimeUnit.SECONDS))
+ println(rm.report().mkString("\n"))
+ }
}
--
To view, visit https://gerrit.wikimedia.org/r/395915
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia5cdb36f31dd4512b048de74f2b2d769d2fa7acf
Gerrit-PatchSet: 1
Gerrit-Project: search/xgboost
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits