Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/21916#discussion_r206375077 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} +import java.io.FileNotFoundException +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +class ProcfsBasedSystems extends ProcessTreeMetrics { + val procfsDir = "/proc/" + var isAvailable: Boolean = isItProcfsBased + val pid: Int = computePid() + val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = + scala.collection.mutable.Map[ Int, Set[Int]]() + val PROCFS_STAT_FILE = "stat" + + + def isItProcfsBased: Boolean = { + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + if (testing) { + return true + } + try { + if (!Files.exists(Paths.get(procfsDir))) { + return false + } + } + catch { + case f: FileNotFoundException => return false + } + true + } + + + def computePid(): Int = { + if (!isAvailable) { + return -1; + } + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + var out: Array[Byte] = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; + } + + + def createProcessTree(): Unit = { + if (!isAvailable) { + return + } + val queue: Queue[Int] = new Queue[Int]() + queue += pid + while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { + queue ++= c + ptree += (p -> c.toSet) + } + else { + ptree += (p -> Set[Int]()) + } + } + } + + + def updateProcessTree(): Unit = { + if (!isAvailable) { + return + } + val queue: Queue[Int] = new Queue[Int]() + queue += pid + while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { + queue ++= c + val preChildren = ptree.get(p) + preChildren match { + case Some(children) => if (!c.toSet.equals(children)) { + val diff: Set[Int] = children -- c.toSet + ptree.update(p, c.toSet ) + diff.foreach(ptree.remove(_)) + } + case None => ptree.update(p, c.toSet ) + } + } + else { + ptree.update(p, Set[Int]()) + } + } + } + + + /** + * Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory + * info. I tried that but found it not correct during tests, so I used normal string analysis + * instead. The computation of RSS and Vmem are based on proc(5): + * http://man7.org/linux/man-pages/man5/proc.5.html + */ + def getProcessInfo(pid: Int): String = { + try { + val pidDir: File = new File(procfsDir, pid.toString) + val fReader = new InputStreamReader( + new FileInputStream( + new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8")) + val in: BufferedReader = new BufferedReader(fReader) + val procInfo = in.readLine + in.close + fReader.close + return procInfo + } catch { + case f: FileNotFoundException => return null + } + null + } + + + def getRSSInfo(): Long = { + if (!isAvailable) { + return -1 + } + updateProcessTree + val pids = ptree.keySet + var totalRss = 0L + for (p <- pids) { + totalRss += getProcessRSSInfo(p) + } + totalRss + } + + def getProcessRSSInfo(pid: Int): Long = { + val pInfo = getProcessInfo(pid) + if (pInfo != null) { + val pInfoSplit = pInfo.split(" ") + // According to proc(5) RSS is the 24th value when we read first line of /proc/[pid]/stat + return pInfoSplit(23).toLong + } + 0 + } + + def getVirtualMemInfo(): Long = { + if (!isAvailable) { + return -1 + } + // We won't call updateProcessTree here since we already did that when we + // computed RSS info + val pids = ptree.keySet + var totalVMem = 0L + for (p <- pids) { + totalVMem += getProcessVirtualMemInfo(p) + } + totalVMem + } + + + def getProcessVirtualMemInfo(pid: Int): Long = { + val pInfo = getProcessInfo(pid) + if (pInfo != null) { + val pInfoSplit = pInfo.split(" ") + // According to proc(5) Vmem is the 23rd value when we read first line of /proc/[pid]/stat + return pInfoSplit(22).toLong + } + 0L + } + + + def getChildPIds(pid: Int): ArrayBuffer[Int] = { + val cmd = Array("pgrep", "-P", pid.toString) + val input = Runtime.getRuntime.exec(cmd).getInputStream + var childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer() + var d = input.read() + while(d != -1) { + childPidsInByte.append(d.asInstanceOf[Byte]) + d = input.read() + } + input.close() + val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n") + val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]() + for(p <- childPids ) { + if (p.matches("[0-9][0-9]*")) { --- End diff -- I will change this, since my goal was first to avoid cases when pgrep -P pid return error messages or something else, but it seems that it won't.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org