EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/395915 )

Change subject: [DNM] more debugging of RssFile explosion
......................................................................

[DNM] more debugging of RssFile explosion

Change-Id: Ia5cdb36f31dd4512b048de74f2b2d769d2fa7acf
---
M 
jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
M 
jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
2 files changed, 117 insertions(+), 8 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/xgboost 
refs/changes/15/395915/1

diff --git 
a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
 
b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
index 22ed87c..e8f55b4 100644
--- 
a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
+++ 
b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ResourceMonitorThread.scala
@@ -16,13 +16,15 @@
 
 package ml.dmlc.xgboost4j.scala.spark
 
+import java.io.{BufferedReader, File, FileReader}
 import java.lang.management.ManagementFactory
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.regex.Pattern
 
 import org.apache.commons.logging.LogFactory
 
 import scala.concurrent.duration.Duration
-import scala.io.Source
+import scala.io.{BufferedSource, Source}
 
 class ResourceMonitorThread(reportEvery: Duration) extends Thread {
   super.setDaemon(true)
@@ -37,18 +39,120 @@
       return
     }
     while (keepChecking.get()) {
-      report()
+      report().foreach(logger.info)
       Thread.sleep(reportEvery.toMillis)
     }
   }
 
   def stopChecking(): Unit = keepChecking.set(false)
 
-  private def report(): Unit = {
+  def report(): Seq[String] = {
     val rss = Source.fromFile(s"/proc/$pid/status").getLines()
       .filter(_.startsWith("Rss"))
       .mkString(", ")
-    logger.info(rss)
-    logger.info(memoryBean.getHeapMemoryUsage)
+    Seq(rss,
+      memoryBean.getHeapMemoryUsage.toString,
+      // 5 largest contributors to RSSFile
+      collectSMapInfo().take(5).map({ info =>
+        s"${info.mem()}: ${info.name}"
+      }).mkString("\n")
+    ).filter(_.length > 0)
   }
+
+  private val ADDRESS_PATTERN = 
raw"^([a-f0-9]*)-([a-f0-9]*)(\s)*([rxwps\-]*).*".r
+  private val MEM_INFO_PATTERN = raw"^([A-Z].*):[\s ]*(.*).*".r
+  private val KB = "kB"
+  private val READ_ONLY_WITH_SHARED_PERMISSION = "r--s"
+  private val READ_EXECUTE_WITH_SHARED_PERMISSION = "r-xs"
+
+  val file = new File(s"/proc/$pid/smaps")
+
+  private def collectSMapInfo(): List[ProcessSmapMemoryInfo] = {
+    if (!file.exists()) {
+      return Nil
+    }
+    val lines = Source.fromFile(s"/proc/$pid/smaps").getLines()
+    lines.map(_.trim).foldLeft(List[ProcessSmapMemoryInfo]()) { (acc, line) =>
+      line match {
+        case ADDRESS_PATTERN(startAddr, endAddr, space, permission) =>
+          new ProcessSmapMemoryInfo(line, permission) :: acc
+        case MEM_INFO_PATTERN(key, value) =>
+          acc match {
+            case memInfo :: xs => memInfo.setMemInfo(key.trim, 
value.replace(KB, "").trim) :: xs
+            case Nil => Nil
+          }
+        case _ => acc
+      }
+    }.filter { memInfo =>
+      
!memInfo.permission.trim.equalsIgnoreCase(READ_ONLY_WITH_SHARED_PERMISSION) &&
+      
!memInfo.permission.trim.equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)
+    }.sortBy(_.mem()).reverse
+  }
+}
+
+class ProcessSmapMemoryInfo(val name: String, val permission: String) {
+  var size: Int = 0
+  var rss: Int = 0
+  var pss: Int = 0
+  var sharedClean: Int = 0
+  var sharedDirty: Int = 0
+  var privateClean: Int = 0
+  var privateDirty: Int = 0
+  var referenced: Int = 0
+  var regionName: String = ""
+
+  def setMemInfo(key: String, value: String): ProcessSmapMemoryInfo = {
+    try {
+      val intval = value.trim.toInt
+      MemInfo(key) match {
+        case MemInfo.SIZE => size = intval
+        case MemInfo.RSS => rss = intval
+        case MemInfo.PSS => pss = intval
+        case MemInfo.SHARED_CLEAN => sharedClean = intval
+        case MemInfo.SHARED_DIRTY => sharedDirty = intval
+        case MemInfo.PRIVATE_CLEAN => privateClean = intval
+        case MemInfo.PRIVATE_DIRTY => privateDirty = intval
+        case MemInfo.REFERENCED => referenced = intval
+        case _ => None
+      }
+    } catch {
+      case e: NumberFormatException => Nil
+    }
+    this
+  }
+
+  def mem(): Int = {
+    // Math.min(sharedDirty, pss) + privateDirty + privateClean
+    rss
+  }
+}
+
+object MemInfo {
+  sealed abstract class MemInfoVal(val name: String) {
+    override def toString: String = name
+  }
+
+  def apply(name: String): MemInfoVal = {
+    values.collectFirst { case i if i.name.equalsIgnoreCase(name.trim) => i 
}.getOrElse(INVALID)
+  }
+
+  val values: Seq[MemInfoVal] = Seq(
+    SIZE, RSS, PSS, SHARED_CLEAN, SHARED_DIRTY, PRIVATE_CLEAN, PRIVATE_DIRTY,
+    REFERENCED, ANONYMOUS, ANON_HUGE_PAGES, SWAP, KERNEL_PAGE_SIZE, 
MMU_PAGE_SIZE)
+
+
+  case object SIZE extends MemInfoVal("Size")
+  case object RSS extends MemInfoVal("Rss")
+  case object PSS extends MemInfoVal("Pss")
+  case object SHARED_CLEAN extends MemInfoVal("Shared_Clean")
+  case object SHARED_DIRTY extends MemInfoVal("Shared_Dirty")
+  case object PRIVATE_CLEAN extends MemInfoVal("Private_Clean")
+  case object PRIVATE_DIRTY extends MemInfoVal("Private_Dirty")
+  case object REFERENCED extends MemInfoVal("Referenced")
+  case object ANONYMOUS extends MemInfoVal("Anonymous")
+  case object ANON_HUGE_PAGES extends MemInfoVal("Anon_Huge_Pages")
+  case object SWAP extends MemInfoVal("swap")
+  case object KERNEL_PAGE_SIZE extends MemInfoVal("kernelPageSize")
+  case object MMU_PAGE_SIZE extends MemInfoVal("mmuPageSize")
+  case object INVALID extends MemInfoVal("invalid")
 }
diff --git 
a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
 
b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
index cf9aa4c..b8b0c1b 100644
--- 
a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
+++ 
b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
@@ -17,19 +17,19 @@
 package ml.dmlc.xgboost4j.scala.spark
 
 import java.nio.file.Files
-import java.util.concurrent.LinkedBlockingDeque
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
 
 import scala.util.Random
-
 import ml.dmlc.xgboost4j.java.Rabit
 import ml.dmlc.xgboost4j.scala.DMatrix
 import ml.dmlc.xgboost4j.scala.rabit.RabitTracker
-
 import org.apache.spark.SparkContext
 import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
 import org.apache.spark.ml.linalg.{DenseVector, Vectors, Vector => SparkVector}
 import org.apache.spark.rdd.RDD
 import org.scalatest.FunSuite
+
+import scala.concurrent.duration.Duration
 
 class XGBoostGeneralSuite extends FunSuite with PerTest {
   test("test Rabit allreduce to validate Scala-implemented Rabit tracker") {
@@ -391,4 +391,9 @@
     assert(eval.eval(xgBoostModel.booster.predict(testSetDMatrix, outPutMargin 
= true),
       testSetDMatrix) < 0.1)
   }
+
+  test("resource monitor") {
+    val rm = new ResourceMonitorThread(Duration(10, TimeUnit.SECONDS))
+    println(rm.report().mkString("\n"))
+  }
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/395915
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia5cdb36f31dd4512b048de74f2b2d769d2fa7acf
Gerrit-PatchSet: 1
Gerrit-Project: search/xgboost
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to