EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/395915 )
Change subject: [DNM] more debugging of RssFile explosion ...................................................................... [DNM] more debugging of RssFile explosion Change-Id: Ia5cdb36f31dd4512b048de74f2b2d769d2fa7acf --- M jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala M jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala 2 files changed, 117 insertions(+), 8 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/xgboost refs/changes/15/395915/1 diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala index 22ed87c..e8f55b4 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala @@ -16,13 +16,15 @@ package ml.dmlc.xgboost4j.scala.spark +import java.io.{BufferedReader, File, FileReader} import java.lang.management.ManagementFactory import java.util.concurrent.atomic.AtomicBoolean +import java.util.regex.Pattern import org.apache.commons.logging.LogFactory import scala.concurrent.duration.Duration -import scala.io.Source +import scala.io.{BufferedSource, Source} class ResourceMonitorThread(reportEvery: Duration) extends Thread { super.setDaemon(true) @@ -37,18 +39,120 @@ return } while (keepChecking.get()) { - report() + report().foreach(logger.info) Thread.sleep(reportEvery.toMillis) } } def stopChecking(): Unit = keepChecking.set(false) - private def report(): Unit = { + def report(): Seq[String] = { val rss = Source.fromFile(s"/proc/$pid/status").getLines() .filter(_.startsWith("Rss")) .mkString(", ") - logger.info(rss) - logger.info(memoryBean.getHeapMemoryUsage) + Seq(rss, + memoryBean.getHeapMemoryUsage.toString, + // 5 largest contributors to RSSFile + collectSMapInfo().take(5).map({ info => + s"${info.mem()}: ${info.name}" + }).mkString("\n") + ).filter(_.length > 0) } + + private val ADDRESS_PATTERN = raw"^([a-f0-9]*)-([a-f0-9]*)(\s)*([rxwps\-]*).*".r + private val MEM_INFO_PATTERN = raw"^([A-Z].*):[\s ]*(.*).*".r + private val KB = "kB" + private val READ_ONLY_WITH_SHARED_PERMISSION = "r--s" + private val READ_EXECUTE_WITH_SHARED_PERMISSION = "r-xs" + + val file = new File(s"/proc/$pid/smaps") + + private def collectSMapInfo(): List[ProcessSmapMemoryInfo] = { + if (!file.exists()) { + return Nil + } + val lines = Source.fromFile(s"/proc/$pid/smaps").getLines() + lines.map(_.trim).foldLeft(List[ProcessSmapMemoryInfo]()) { (acc, line) => + line match { + case ADDRESS_PATTERN(startAddr, endAddr, space, permission) => + new ProcessSmapMemoryInfo(line, permission) :: acc + case MEM_INFO_PATTERN(key, value) => + acc match { + case memInfo :: xs => memInfo.setMemInfo(key.trim, value.replace(KB, "").trim) :: xs + case Nil => Nil + } + case _ => acc + } + }.filter { memInfo => + !memInfo.permission.trim.equalsIgnoreCase(READ_ONLY_WITH_SHARED_PERMISSION) && + !memInfo.permission.trim.equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION) + }.sortBy(_.mem()).reverse + } +} + +class ProcessSmapMemoryInfo(val name: String, val permission: String) { + var size: Int = 0 + var rss: Int = 0 + var pss: Int = 0 + var sharedClean: Int = 0 + var sharedDirty: Int = 0 + var privateClean: Int = 0 + var privateDirty: Int = 0 + var referenced: Int = 0 + var regionName: String = "" + + def setMemInfo(key: String, value: String): ProcessSmapMemoryInfo = { + try { + val intval = value.trim.toInt + MemInfo(key) match { + case MemInfo.SIZE => size = intval + case MemInfo.RSS => rss = intval + case MemInfo.PSS => pss = intval + case MemInfo.SHARED_CLEAN => sharedClean = intval + case MemInfo.SHARED_DIRTY => sharedDirty = intval + case MemInfo.PRIVATE_CLEAN => privateClean = intval + case MemInfo.PRIVATE_DIRTY => privateDirty = intval + case MemInfo.REFERENCED => referenced = intval + case _ => None + } + } catch { + case e: NumberFormatException => Nil + } + this + } + + def mem(): Int = { + // Math.min(sharedDirty, pss) + privateDirty + privateClean + rss + } +} + +object MemInfo { + sealed abstract class MemInfoVal(val name: String) { + override def toString: String = name + } + + def apply(name: String): MemInfoVal = { + values.collectFirst { case i if i.name.equalsIgnoreCase(name.trim) => i }.getOrElse(INVALID) + } + + val values: Seq[MemInfoVal] = Seq( + SIZE, RSS, PSS, SHARED_CLEAN, SHARED_DIRTY, PRIVATE_CLEAN, PRIVATE_DIRTY, + REFERENCED, ANONYMOUS, ANON_HUGE_PAGES, SWAP, KERNEL_PAGE_SIZE, MMU_PAGE_SIZE) + + + case object SIZE extends MemInfoVal("Size") + case object RSS extends MemInfoVal("Rss") + case object PSS extends MemInfoVal("Pss") + case object SHARED_CLEAN extends MemInfoVal("Shared_Clean") + case object SHARED_DIRTY extends MemInfoVal("Shared_Dirty") + case object PRIVATE_CLEAN extends MemInfoVal("Private_Clean") + case object PRIVATE_DIRTY extends MemInfoVal("Private_Dirty") + case object REFERENCED extends MemInfoVal("Referenced") + case object ANONYMOUS extends MemInfoVal("Anonymous") + case object ANON_HUGE_PAGES extends MemInfoVal("Anon_Huge_Pages") + case object SWAP extends MemInfoVal("swap") + case object KERNEL_PAGE_SIZE extends MemInfoVal("kernelPageSize") + case object MMU_PAGE_SIZE extends MemInfoVal("mmuPageSize") + case object INVALID extends MemInfoVal("invalid") } diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index cf9aa4c..b8b0c1b 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -17,19 +17,19 @@ package ml.dmlc.xgboost4j.scala.spark import java.nio.file.Files -import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.{LinkedBlockingDeque, TimeUnit} import scala.util.Random - import ml.dmlc.xgboost4j.java.Rabit import ml.dmlc.xgboost4j.scala.DMatrix import ml.dmlc.xgboost4j.scala.rabit.RabitTracker - import org.apache.spark.SparkContext import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} import org.apache.spark.ml.linalg.{DenseVector, Vectors, Vector => SparkVector} import org.apache.spark.rdd.RDD import org.scalatest.FunSuite + +import scala.concurrent.duration.Duration class XGBoostGeneralSuite extends FunSuite with PerTest { test("test Rabit allreduce to validate Scala-implemented Rabit tracker") { @@ -391,4 +391,9 @@ assert(eval.eval(xgBoostModel.booster.predict(testSetDMatrix, outPutMargin = true), testSetDMatrix) < 0.1) } + + test("resource monitor") { + val rm = new ResourceMonitorThread(Duration(10, TimeUnit.SECONDS)) + println(rm.report().mkString("\n")) + } } -- To view, visit https://gerrit.wikimedia.org/r/395915 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia5cdb36f31dd4512b048de74f2b2d769d2fa7acf Gerrit-PatchSet: 1 Gerrit-Project: search/xgboost Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits