[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-12-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r239185785
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") 
extends Logging {
+  private val procfsStatFile = "stat"
+  private val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  private val pageSize = computePageSize()
+  private var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
--- End diff --

super nit: no need for explicit return and to create `val pid`, just have 
the final statement be `Integer.parseInt(out.split("\n")(0))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...

2018-12-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23236
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...

2018-12-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23223
  
@attilapiros 

> I mean if node blacklisting in Spark would be perfectly aligned to YARN 
then it would be just redundant to have it in Spark in the first place.

I'm not super familiar with exactly how the blacklisting works in yarn 
itself -- it looks like its only going to blacklist the node for the AM, not 
other nodes for general containers.  I don't totally follow where the 
`KILLED_BY_RESOURCEMANAGER` status is generated, but it does seem like a good 
idea to protect against this, maybe there is a race where the container is 
created by the RM, but before it reports back to the driver it gets killed with 
KILLED_BY_RESOURCEMANAGER.  (another reason I'm curious if @Ngone51 is actually 
seeing this cause problems, or just noticed a case to improve.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...

2018-12-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23223#discussion_r239174670
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -612,11 +612,14 @@ private[yarn] class YarnAllocator(
 val message = "Container killed by YARN for exceeding physical 
memory limits. " +
   s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
 (true, message)
+  case exit_status if 
NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(exit_status) =>
--- End diff --

also after this gets rearranged, I'd leave a comment in here pointing to 
the code in hadoop you linked to on the jira.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...

2018-12-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23223#discussion_r239173889
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 ---
@@ -417,4 +426,59 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 clock.advance(50 * 1000L)
 handler.getNumExecutorsFailed should be (0)
   }
+
+  test("SPARK-26296: YarnAllocator should have same blacklist behaviour 
with YARN") {
+val rmClientSpy = spy(rmClient)
+val maxExecutors = 11
+
+val handler = createAllocator(
+  maxExecutors,
+  rmClientSpy,
+  Map(
+"spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> 
"true",
+"spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
+handler.updateResourceRequests()
+
+val hosts = (0 until maxExecutors).map(i => s"host$i")
+val ids = (0 to maxExecutors).map(i => 
ContainerId.newContainerId(appAttemptId, i))
+val containers = createContainers(hosts, ids)
+handler.handleAllocatedContainers(containers.slice(0, 9))
+val cs0 = ContainerStatus.newInstance(containers(0).getId, 
ContainerState.COMPLETE,
+  "success", ContainerExitStatus.SUCCESS)
+val cs1 = ContainerStatus.newInstance(containers(1).getId, 
ContainerState.COMPLETE,
+  "preempted", ContainerExitStatus.PREEMPTED)
+val cs2 = ContainerStatus.newInstance(containers(2).getId, 
ContainerState.COMPLETE,
+  "killed_exceeded_vmem", ContainerExitStatus.KILLED_EXCEEDED_VMEM)
+val cs3 = ContainerStatus.newInstance(containers(3).getId, 
ContainerState.COMPLETE,
+  "killed_exceeded_pmem", ContainerExitStatus.KILLED_EXCEEDED_PMEM)
+val cs4 = ContainerStatus.newInstance(containers(4).getId, 
ContainerState.COMPLETE,
+  "killed_by_resourcemanager", 
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER)
+val cs5 = ContainerStatus.newInstance(containers(5).getId, 
ContainerState.COMPLETE,
+  "killed_by_appmaster", ContainerExitStatus.KILLED_BY_APPMASTER)
+val cs6 = ContainerStatus.newInstance(containers(6).getId, 
ContainerState.COMPLETE,
+  "killed_after_app_completion", 
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION)
+val cs7 = ContainerStatus.newInstance(containers(7).getId, 
ContainerState.COMPLETE,
+  "aborted", ContainerExitStatus.ABORTED)
+val cs8 = ContainerStatus.newInstance(containers(8).getId, 
ContainerState.COMPLETE,
+  "disk_failed", ContainerExitStatus.DISKS_FAILED)
--- End diff --

just a suggestion, you can avoid some repetition here

```scala
val nonBlacklistedStatuses = Seq(ContainerExitStatus.SUCCESSS, ..., 
ContainerExitStatus.DISKS_FAILED)
val containerStatuses = nonBlacklistedStatus.zipWithIndex.map { case 
(state, idx) =>
  ContainerStatus.newInstance(containers(idx).getId, 
ContainerState.COMPLETE, "diagnostics", state)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23092: [SPARK-26094][CORE][STREAMING] createNonEcFile cr...

2018-12-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23092#discussion_r238426468
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -471,7 +471,11 @@ object SparkHadoopUtil {
 try {
   // Use reflection as this uses apis only avialable in hadoop 3
   val builderMethod = fs.getClass().getMethod("createFile", 
classOf[Path])
-  val builder = builderMethod.invoke(fs, path)
+  // the builder api does not resolve relative paths, nor does it 
create parent dirs, while
+  // the old api does.
+  fs.mkdirs(path.getParent())
--- End diff --

good point, just handled this
(sorry didn't see this earlier)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-12-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r238424342
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 4096;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while ( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if (!c.isEmpty) {
+queue ++= c
+ptree ++= c.toSet
+  }
+}

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-12-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r238388633
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") 
extends Logging {
--- End diff --

no, not a var -- on a constructor argument, you don't need to put in either 
`val` or `var`.


https://stackoverflow.com/questions/14694712/do-scala-constructor-parameters-default-to-private-val


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-12-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r238371374
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 4096;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while ( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if (!c.isEmpty) {
+queue ++= c
+ptree ++= c.toSet
+  }
+}

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-12-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r238363825
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
--- End diff --

I think these can all be private.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-12-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r238366634
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 4096;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while ( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if (!c.isEmpty) {
+queue ++= c
+ptree ++= c.toSet
+  }
+}

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-12-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r238363142
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") 
extends Logging {
--- End diff --

`procfsDir` doesn't need to be a `val`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237992560
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237982013
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237975861
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -138,19 +138,22 @@ private[spark] class ProcfsMetricsGetter(
   }
   val stdoutThread = Utils.processStreamByLine("read stdout for pgrep",
 process.getInputStream, appendChildPid)
-  val error = process.getErrorStream
-  var errorString = ""
-  (0 until error.available()).foreach { i =>
-errorString += error.read()
-  }
+  val errorStringBuilder = new StringBuilder()
+  val stdErrThread = Utils.processStreamByLine(
+"stderr for pgrep",
+process.getErrorStream,
+{ line =>
+errorStringBuilder.append(line)
+  })
--- End diff --

nit: if the line-handling closure is multiline, you should indent the body 
more.  but I'd just put it on one line

```scala
val stdErrThread = Utils.processStreamByLine(
  "stderr for pgrep",
  process.getErrorStream,
  line => errorStringBuilder.append(line))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237981353
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237978529
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
--- End diff --

I think its pretty weird having this constructor argument only used for 
tests.  I'd either (a) always use this argument (and make the page size 
computed in the default constructor), or (b) don't make this a constructor 
argument at all, and just hardcode the value in `computePageSize` if testing 
(you only set it to one value during testing, we don't need it parameterizable 
more than that currently).

(b) should be pretty easy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237979881
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
--- End diff --

 

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237977304
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -49,14 +47,14 @@ class ExecutorMetrics private[spark] extends 
Serializable {
   }
 
   /**
-   * Constructor: create the ExecutorMetrics with the values specified.
+   * Constructor: create the ExecutorMetrics with using a given map.
*
* @param executorMetrics map of executor metric name to value
*/
   private[spark] def this(executorMetrics: Map[String, Long]) {
 this()
-(0 until ExecutorMetricType.values.length).foreach { idx =>
-  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+ExecutorMetricType.metricToOffset.map { m =>
+  metrics(m._2) = executorMetrics.getOrElse(m._1, 0L)
--- End diff --

you can use pattern matching here.  Also you're not returning anything from 
that loop, so `foreach` is more appropriate than `map`.

```scala
.foreach { case(name, idx) =>
  metrics(idx) = executorsMetrics.getOrElse(name, 0L)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r237982177
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237886286
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

yes +1.  Sorry didn't mean to get things stuck on this, just wanted to make 
sure I was actually following what was happening correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237738802
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

yeah I see how it was wrong before.  I'm saying, after you add 
`setupDecryptionServer`, then that decryption server would still be reading 
from the value of `path` which gets updated here, since its the same object in 
the driver's JVM.

anyway, this isn't a big deal, I think its better with your change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-29 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23058
  
merged to master, thanks @wypoon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237535187
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
---
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var 
path: String) extends Serial
   override def handleConnection(sock: Socket): Unit = {
 val env = SparkEnv.get
 val in = sock.getInputStream()
-val dir = new File(Utils.getLocalDir(env.conf))
-val file = File.createTempFile("broadcast", "", dir)
-path = file.getAbsolutePath
-val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(path))
+val abspath = new File(path).getAbsolutePath
+val out = env.serializerManager.wrapForEncryption(new 
FileOutputStream(abspath))
--- End diff --

just want to make sure I understand this part -- this change isn't 
necessary, right?  even in the old version, `path` gets updated here, so 
`setupDecryptionServer` would know where to read the data from.

that said, I do think your change makes more sense -- not sure why I didn't 
just use the supplied path in the first place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23058#discussion_r237268983
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -789,21 +785,31 @@ private[spark] class BlockManager(
   }
 
   if (data != null) {
-// SPARK-24307 undocumented "escape-hatch" in case there are any 
issues in converting to
-// ChunkedByteBuffer, to go back to old code-path.  Can be removed 
post Spark 2.4 if
-// new path is stable.
-if (remoteReadNioBufferConversion) {
-  return Some(new ChunkedByteBuffer(data.nioByteBuffer()))
-} else {
-  return Some(ChunkedByteBuffer.fromManagedBuffer(data))
-}
+assert(!data.isInstanceOf[BlockManagerManagedBuffer])
--- End diff --

@wypoon sorry one more thing -- can you add a comment here explaining why 
we have this assert?  Otherwise it looks kinda random.  Something like "The 
lifecycyle of BlockManagerManagedBuffer is slightly different, in particular 
wrt dispose of offheap buffers.  Currently this path will never generate a 
BlockManagerManagedBuffer (as we've just fetched the bytes remotely) -- this 
assert is just to make sure that isn't changed in the future (or if it is, the 
lifecycle is reconsidered)."  I guess that's quite a long blurb, but think its 
worthwhile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-28 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23058
  
> causes any performance degradation compared to memory mapping

@ankuriitg good question, though if you look at what the old code was 
doing, it wasn't memory mapping the file, it was reading it into memory from a 
regular input stream, take a look at 
[`ChunkedByteBuffer.fromFile`](https://github.com/apache/spark/blob/fa0d4bf69929c5acd676d602e758a969713d19d8/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala#L192-L212)

basically doing the the same thing this is doing now, but without the extra 
memory overhead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...

2018-11-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23166#discussion_r237228157
  
--- Diff: python/pyspark/broadcast.py ---
@@ -118,8 +121,16 @@ def dump(self, value, f):
 f.close()
 
 def load_from_path(self, path):
-with open(path, 'rb', 1 << 20) as f:
-return self.load(f)
+# we only need to decrypt it here if its on the driver since 
executor
+# decryption handled already
+if self._sc is not None and self._sc._encryption_enabled:
--- End diff --

can you move the entire conditional check into `value()` instead, and keep 
`load_from_path` like it was before?  `value()` is already checking for 
different scenarios, and keeps the meaning of this function more 
straightforward.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23166: [SPARK-26201] Fix python broadcast with encryption

2018-11-28 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23166
  
> The only question I have is does this cause more memory usage on the 
driver because it has a reference to that broadcast value or is something else 
already holding on to it?

yeah good point.  I guess that would make you hold onto a reference to the 
original value, even if it was never used on the driver.  the proposal here is 
more like what is done in TorrentBroadcast.java -- its written to the driver's 
block manager, and read back if you request the value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23166: [SPARK-26201] Fix python broadcast with encryption

2018-11-28 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23166
  
maybe a dumb question -- couldn't you just add

```python
self._value = value
```

[inside `__init__()` when on the 
driver](https://github.com/apache/spark/blob/master/python/pyspark/broadcast.py#L78)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236863378
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -28,16 +28,14 @@ import org.apache.spark.metrics.ExecutorMetricType
 @DeveloperApi
 class ExecutorMetrics private[spark] extends Serializable {
 
-  // Metrics are indexed by ExecutorMetricType.values
-  private val metrics = new Array[Long](ExecutorMetricType.values.length)
-
+  private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
--- End diff --

I'd keep a comment here explaining this array, just update that its now 
indexed by `ExecutorMetricType.metricToOffset`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236856360
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236858045
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236856995
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236860940
  
--- Diff: 
core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.SparkFunSuite
+
+
+class ProcfsMetricsGetterSuite extends SparkFunSuite {
+
+  val p = new ProcfsMetricsGetter(getTestResourcePath("ProcessTree"), 
4096L)
--- End diff --

minor, can you rename this dir to "ProcfsMetrics"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236857465
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236855952
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236860104
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -95,10 +136,22 @@ private[spark] object ExecutorMetricType {
 OnHeapUnifiedMemory,
 OffHeapUnifiedMemory,
 DirectPoolMemory,
-MappedPoolMemory
+MappedPoolMemory,
+ProcessTreeMetrics
   )
 
-  // Map of executor metric type to its index in values.
-  val metricIdxMap =
-Map[ExecutorMetricType, Int](ExecutorMetricType.values.zipWithIndex: 
_*)
+
+  val (metricToOffset, numMetrics) = {
+var numberOfMetrics = 0
+val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
+metricGetters.foreach { m =>
+  var metricInSet = 0
+  while (metricInSet < m.names.length) {
+definedMetricsAndOffset += (m.names(metricInSet) -> (metricInSet + 
numberOfMetrics))
+metricInSet += 1
+  }
--- End diff --

```scala
(0 until m.names.length).foreach { idx =>
  definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236855451
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236848338
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236847493
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(
+val procfsDir: String = "/proc/",
+val pSizeForTest: Long = 0) extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  val pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+case ioe: IOException =>
+  logWarning("Exception checking for procfs dir", ioe)
+  false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+  " As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+-1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return pSizeForTest;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception =>
+logWarning("Exception when trying to compute pagesize, as a" +
+  " result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
+queue ++= c
  

[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-27 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23058
  
@mridulm @jerryshao @Ngone51 @vanzin just checking if you want to look at 
this before I merge, will leave open a bit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236769432
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -84,6 +122,8 @@ case object MappedPoolMemory extends 
MBeanExecutorMetricType(
   "java.nio:type=BufferPool,name=mapped")
 
 private[spark] object ExecutorMetricType {
+  final val pTreeInfo = new ProcfsBasedSystems
--- End diff --

Normally having an object helps make it clear that there is a singleton; 
its easier to share properly and easier to figure out how to get a handle on 
it.  Given that we'll have a class anyway, I don't think there is a ton of 
value in having there be a companion object.

I do still think the instance you create here should go somewhere else.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r236766605
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
--- End diff --

You can't put it as a default value, but if you make it a static method, 
then you can provide an overloaded method which uses it, see 
https://github.com/squito/spark/commit/cf008355e8b9ce9faeab267cd0763a3859a5ccc9

But, I think your other proposal is even better, if its testing just give 
it a fixed value (no need to even make it an argument to the constructor at 
all).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-27 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23058
  
@attilapiros yes, something like that would be possible.  I was thinking 
you'd just use the existing serializer methods to do it, soemthing like:

```scala
val buffer = getRemoteManagedBuffer()
val valueItr = deserializeStream(buffer.createInputStream())
val result = valueItr.next()
assert(!valueItr.hasNext()) // makes sure its closed too
```

my reluctance to bother with it is that you'd still be getting a 
`DirectTaskResult`, which has the data sitting in a `ByteBuffer` anyway.  Also, 
its not that big a deal, as this is only for a single task result, which is not 
large in general.  The change here is to avoid reading an entire partition into 
memory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23117: [WIP][SPARK-7721][INFRA] Run and generate test co...

2018-11-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23117#discussion_r236431048
  
--- Diff: dev/run-tests.py ---
@@ -434,6 +434,63 @@ def run_python_tests(test_modules, parallelism):
 run_cmd(command)
 
 
+def run_python_tests_with_coverage(test_modules, parallelism):
+set_title_and_block("Running PySpark tests with coverage report", 
"BLOCK_PYSPARK_UNIT_TESTS")
+
+command = [os.path.join(SPARK_HOME, "python", 
"run-tests-with-coverage")]
+if test_modules != [modules.root]:
+command.append("--modules=%s" % ','.join(m.name for m in 
test_modules))
+command.append("--parallelism=%i" % parallelism)
+run_cmd(command)
--- End diff --

this is mostly copied from ~L430, just "run-tests" -> 
"run-tests-with-coverage", could you refactor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23117: [WIP][SPARK-7721][INFRA] Run and generate test co...

2018-11-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23117#discussion_r236431733
  
--- Diff: dev/run-tests.py ---
@@ -434,6 +434,63 @@ def run_python_tests(test_modules, parallelism):
 run_cmd(command)
 
 
+def run_python_tests_with_coverage(test_modules, parallelism):
+set_title_and_block("Running PySpark tests with coverage report", 
"BLOCK_PYSPARK_UNIT_TESTS")
+
+command = [os.path.join(SPARK_HOME, "python", 
"run-tests-with-coverage")]
+if test_modules != [modules.root]:
+command.append("--modules=%s" % ','.join(m.name for m in 
test_modules))
+command.append("--parallelism=%i" % parallelism)
+run_cmd(command)
+post_python_tests_results()
+
+
+def post_python_tests_results():
+if "SPARK_TEST_KEY" not in os.environ:
+print("[error] 'SPARK_TEST_KEY' environment variable was not set. 
Unable to post"
+  "PySpark coverage results.")
+sys.exit(1)
--- End diff --

hmm, this will be a headache for us in our internal builds, as we also run 
these tests, and also set AMPLAB_JENKINS as its sort of used as a catch-all for 
making builds quiet etc., but we won't have this key obviously.

you dont' need to cater to our internal builds, of course, but I'm 
wondering if this will cause a headache for more users that want to run tests 
themselves but won't have the key?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23106: [SPARK-26141] Enable custom metrics implementatio...

2018-11-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23106#discussion_r236418843
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
@@ -242,8 +243,13 @@ private void writeSortedFile(boolean isLastFile) {
   // Note that we intentionally ignore the value of 
`writeMetricsToUse.shuffleWriteTime()`.
   // Consistent with ExternalSorter, we do not count this IO towards 
shuffle write time.
   // This means that this IO time is not accounted for anywhere; 
SPARK-3577 will fix this.
-  writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
-  
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
+
+  // This is guaranteed to be a ShuffleWriteMetrics based on the if 
check in the beginning
+  // of this file.
--- End diff --

I found "beginning of this file" to be confusing, I thought you meant the 
beginning of `ShuffleExternalSorter.java`, not the spill file. maybe "beginning 
of this method"

also is the comment above this about SPARK-3577 out of date now that has 
been fixed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23111: [SPARK-26148][PYTHON][TESTS] Increases default paralleli...

2018-11-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23111
  
we might need to be careful that this doesn't un-intentionally overload the 
jenkins workers so that we end up hitting more timeouts from too many things 
running concurrently (I dunno how isolated the workers are)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23111: [SPARK-26148][PYTHON][TESTS] Increases default paralleli...

2018-11-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23111
  
wow, thats great!  glad there is a big speedup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23109: [SPARK-26069][TESTS][FOLLOWUP]Add another possible error...

2018-11-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23109
  
lte review, but lgtm anyway


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23058
  
lgtm

I looked more into the lifecycle of the buffers and when they get 
`disposed`, and it looks fine to me.  (In fact I think there is no need for the 
`dispose` in the first place, as hinted at here: 
https://github.com/apache/spark/pull/22511#issuecomment-424429691)

I also checked about whether we should buffer the input stream, but 
`dataDeserializeStream` already does that.

@wypoon one thing, can you update the testing section of the pr description 
to mention the coverage you found in the existing unit tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23113: [SPARK-26019][PYTHON] Fix race condition in accumulators...

2018-11-25 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23113
  
I'm not sure if I follow the entire discussion here yet.  I did spend a 
while trying to figure out if there is a race condition, and I'm pretty sure 
there is not.  From your last comments, it looks like there is a bug in zepplin 
for not correctly ensuring that authentication is enabled on the java gateway, 
and as @HyukjinKwon indicates, perhaps a bug in spark for not failing more 
obviously. 

I dont' think I understand this:

> auto token should be set to use Spark

what "auto token" are you referring to?


We should be able to support creating the java gateway inside zepplin, I 
think its conceptually the same as what is done when you launch a pyspark 
shell.  Probably zepplin should be doing something like this: 
https://github.com/apache/spark/blob/master/python/pyspark/java_gateway.py#L40


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23090: [SPARK-26118][Web UI] Introducing spark.ui.requestHeader...

2018-11-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23090
  
btw I agree this was bad judgement on my part to only backport to 2.4, 
sorry abotu that and thanks for catching @dongjoon-hyun .  I do think this 
fixes a bug (the ability to use the UI when you are a member of many user 
groups) and so should be backported further, though I would really be fine 
either way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23090: [SPARK-26118][Web UI] Introducing spark.ui.requestHeader...

2018-11-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23090
  
I think the default is pretty reasonable in most cases, in that this is the 
first time we've heard of someone hitting this limit.  I'm not sure how high we 
would make it to get around this problem in general.  And from the jetty docs: 
https://www.eclipse.org/jetty/javadoc/current/org/eclipse/jetty/server/HttpConfiguration.html#setRequestHeaderSize-int-

> Larger headers will allow for more and/or larger cookies plus larger form 
content encoded in a URL. However, larger headers consume more memory and can 
make a server more vulnerable to denial of service attacks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r235124501
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark issue #23000: [SPARK-26002][SQL] Fix day of year calculation for Julia...

2018-11-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23000
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23090: [SPARK-26118][Web UI] Introducing spark.ui.requestHeader...

2018-11-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23090
  
merged to master / 2.4


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23092: [SPARK-26094][CORE][STREAMING] createNonEcFile cr...

2018-11-19 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/23092

[SPARK-26094][CORE][STREAMING] createNonEcFile creates parent dirs.

## What changes were proposed in this pull request?

We explicitly avoid files with hdfs erasure coding for the streaming WAL
and for event logs, as hdfs EC does not support all relevant apis.
However, the new builder api used has different semantics -- it does not
create parent dirs, and it does not resolve relative paths.  This
updates createNonEcFile to have similar semantics to the old api.

## How was this patch tested?

Ran tests with the WAL pointed at a non-existent dir, which failed before 
this change.  Manually tested the new function with a relative path as well.
Unit tests via jenkins.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-26094

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23092.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23092


commit c52010a3fa291e85f75f862d2d75363e8505fcc7
Author: Imran Rashid 
Date:   2018-11-16T22:45:13Z

[SPARK-26094][CORE][STREAMING] createNonEcFile creates parent dirs.

We explicitly avoid files with hdfs erasure coding for the streaming WAL
and for event logs, as hdfs EC does not support all relevant apis.
However, the new builder api used has different semantics -- it does not
create parent dirs, and it does not resolve relative paths.  This
updates createNonEcFile to have similar semantics to the old api.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23000: [SPARK-26002][SQL] Fix day of year calculation fo...

2018-11-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23000#discussion_r234807971
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 ---
@@ -410,6 +410,30 @@ class DateTimeUtilsSuite extends SparkFunSuite {
 assert(getDayInYear(getInUTCDays(c.getTimeInMillis)) === 78)
   }
 
+  test("SPARK-26002: correct day of year calculations for Julian calendar 
years") {
+TimeZone.setDefault(TimeZoneUTC)
+val c = Calendar.getInstance(TimeZoneUTC)
+c.set(Calendar.MILLISECOND, 0)
+(1000 to 1600 by 100).foreach { year =>
+  // January 1 is the 1st day of year.
+  c.set(year, 0, 1, 0, 0, 0)
+  assert(getYear(getInUTCDays(c.getTimeInMillis)) === year)
+  assert(getMonth(getInUTCDays(c.getTimeInMillis)) === 1)
+  assert(getDayInYear(getInUTCDays(c.getTimeInMillis)) === 1)
+
+  // March 1 is the 61st day of the year as they are leap years. It is 
true for
+  // even the multiples of 100 as before 1582-10-4 the Julian calendar 
leap year calculation
+  // is used in which every multiples of 4 are leap years
+  c.set(year, 2, 1, 0, 0, 0)
+  assert(getDayInYear(getInUTCDays(c.getTimeInMillis)) === 61)
+  assert(getMonth(getInUTCDays(c.getTimeInMillis)) === 3)
+
+  // For non-leap years:
+  c.set(year + 1, 2, 1, 0, 0, 0)
+  assert(getDayInYear(getInUTCDays(c.getTimeInMillis)) === 60)
+}
--- End diff --

this is good, but I think its worth adding checks for a couple of special 
cases:

* 1582-10-3
* 1582-10-14 (though I guess the meaning of "dayInYear" is not so clear in 
this case)
* 1600-01-01
* 1600-03-01

I think they'll all be OK after your change, but good to have a check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23090: [SPARK-26118][Web UI] Introducing spark.ui.reques...

2018-11-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23090#discussion_r234799817
  
--- Diff: docs/configuration.md ---
@@ -973,6 +973,14 @@ Apart from these, the following properties are also 
available, and may be useful
 spark.com.test.filter1.param.name2=bar
   
 
+
+  spark.ui.requestHeaderSize
+  8k
+  
+The HTTP request header size, in bytes unless otherwise specified.
+This setting applied for the Spark History Server too.
--- End diff --

I'd say "The maximum allowed size for a HTTP request header".

Also applied -> applies


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23058#discussion_r234779890
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -693,9 +693,9 @@ private[spark] class BlockManager(
*/
   private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
 val ct = implicitly[ClassTag[T]]
-getRemoteBytes(blockId).map { data =>
+getRemoteManagedBuffer(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
+serializerManager.dataDeserializeStream(blockId, 
data.createInputStream())(ct)
--- End diff --

I'm pretty sure this is OK.  If you're getting remote bytes, you're never 
going to get a `BlockManagerManagedBuffer`.  We should probably add that as an 
assert, though, to make sure.

What still puzzles me, though, is why we there was ever a `dispose=true` 
here.  That is something I'd like to look at more still.  I have a hard time 
believing it could do anything useful, as its not guaranteed to get called in a 
`finally` or a taskCompletionListener etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23058
  
can we also make the same change to `TaskResultGetter`?  We could avoid the 
same inefficiency for large task results that also get fetched to disk.  (That 
would actually get us closer to removing the 2gb block limit on task results, 
but to fully remove it, we'd need to fix the task serialization in the 
executor, which is a little trickier.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/23058#discussion_r234326643
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -693,9 +693,9 @@ private[spark] class BlockManager(
*/
   private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
 val ct = implicitly[ClassTag[T]]
-getRemoteBytes(blockId).map { data =>
+getRemoteManagedBuffer(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
+serializerManager.dataDeserializeStream(blockId, 
data.createInputStream())(ct)
--- End diff --

I noticed one detail here we're going to have to think very carefully 
about. We're losing `dispose = true`. One way we could still end up reading 
from a ChunkedByteBuffer is if `data` is a `BlockManagerManagedBuffer`, and its 
`data: BlockData` is a `ByteBufferBlockData`, which explicitly uses `dispose = 
false`:


https://github.com/apache/spark/blob/2aef79a65a145b76a88f1d4d9367091fd238b949/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L90-L94

need to look through use a bit more, I think we might be fine since if 
you're getting a remote block, it can't be a BlockManagerManagedBuffer anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-18 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23058
  
@attilapiros can you review this please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234340260
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234340168
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
--- End diff --

return isn't necessary


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234343451
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234342703
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234337409
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
--- End diff --

not needed anymroe


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234344505
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234341965
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234344631
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -19,25 +19,43 @@ package org.apache.spark.metrics
 import java.lang.management.{BufferPoolMXBean, ManagementFactory}
 import javax.management.ObjectName
 
+import scala.collection.mutable
+
+import org.apache.spark.executor.ProcfsBasedSystems
 import org.apache.spark.memory.MemoryManager
 
 /**
  * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
  */
 sealed trait ExecutorMetricType {
-  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
-  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+  private[spark] def getMetricValues(memoryManager: MemoryManager): 
Array[Long] = {
+new Array[Long](0)
+  }
+  private[spark] def names: Seq[String] = Seq()
--- End diff --

neither of these methods should have default implementations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234347780
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -84,6 +122,8 @@ case object MappedPoolMemory extends 
MBeanExecutorMetricType(
   "java.nio:type=BufferPool,name=mapped")
 
 private[spark] object ExecutorMetricType {
+  final val pTreeInfo = new ProcfsBasedSystems
--- End diff --

this is a weird place to keep this, unless there is some really good reason 
for it.  I think it should go inside `ProcessTreeMetrics`.

also I'm not sure what the problem was with making it an object.  Seems to 
work for me.  its a bit different now as there are arguments to the constructor 
for testing -- but you could still have an object which extends the class

```scala
private[spark] object ProcfsBasedSystems extends 
ProcfsBasedSystems("/proc/")
```

though doesn't really seem to have much value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234343988
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.{config, Logging}
+
+private[spark] case class ProcfsBasedSystemsMetrics(jvmVmemTotal: Long,
+ jvmRSSTotal: Long,
+ pythonVmemTotal: Long,
+ pythonRSSTotal: Long,
+ otherVmemTotal: Long,
+ otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems extends Logging {
+  var procfsDir = "/proc/"
+  val procfsStatFile = "stat"
+  var pageSize = 0
+  var isAvailable: Boolean = isItProcfsBased
+  private val pid: Int = computePid()
+  private val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
+scala.collection.mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+  private var latestJVMVmemTotal: Long = 0
+  private var latestJVMRSSTotal: Long = 0
+  private var latestPythonVmemTotal: Long = 0
+  private var latestPythonRSSTotal: Long = 0
+  private var latestOtherVmemTotal: Long = 0
+  private var latestOtherRSSTotal: Long = 0
+
+  computeProcessTree()
+
+  private def isItProcfsBased: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out: Array[Byte] = Array.fill[Byte](length)(0)
+  Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+  val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+  return pid;
+}
+catch {
+  case e: IOException => logDebug("IO Exception when trying to compute 
process tree." +
+" As a result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return -1
+  case _ => logDebug("Some exception occurred when trying to compute 
process tree. " +
+"As a result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Unit = {
+val cmd = Array("getconf", "PAGESIZE")
+   

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234334930
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -28,16 +28,14 @@ import org.apache.spark.metrics.ExecutorMetricType
 @DeveloperApi
 class ExecutorMetrics private[spark] extends Serializable {
 
-  // Metrics are indexed by ExecutorMetricType.values
-  private val metrics = new Array[Long](ExecutorMetricType.values.length)
-
+  private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
   // the first element is initialized to -1, indicating that the values 
for the array
   // haven't been set yet.
   metrics(0) = -1
 
-  /** Returns the value for the specified metricType. */
-  def getMetricValue(metricType: ExecutorMetricType): Long = {
-metrics(ExecutorMetricType.metricIdxMap(metricType))
+  /** Returns the value for the specified metric. */
+  def getMetricValue(metricName: String): Long = {
+metrics(ExecutorMetricType.metricToOffset.get(metricName).get)
--- End diff --

no point in `metrics.get(...).get()`.  if its OK for this to throw an 
exception for a missing key, then just do `metrics(...)`, like before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234339484
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
--- End diff --

this can be simplified to

```scala
val procDirExists = Try(Files.exists(Paths.get(procfsDir)).recover {
  case ioe: IOException =>
logWarning("Exception checking for procfs dir", f)
false
}
```

or maybe even ditch that warning msg completely ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234340136
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
--- End diff --

return isn't necessary here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234337238
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -69,9 +67,8 @@ class ExecutorMetrics private[spark] extends Serializable 
{
*/
   private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
 var updated = false
-
-(0 until ExecutorMetricType.values.length).foreach { idx =>
-   if (executorMetrics.metrics(idx) > metrics(idx)) {
+ExecutorMetricType.metricToOffset.map { case (_, idx) =>
--- End diff --

minor, but i think foreach on a range is both more clear and more efficient 
here.  You could do `(0 until ExecutorMetricType.numMetrics).foreach`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234339856
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
--- End diff --

nit: indent the 2nd line. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234344821
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -19,25 +19,43 @@ package org.apache.spark.metrics
 import java.lang.management.{BufferPoolMXBean, ManagementFactory}
 import javax.management.ObjectName
 
+import scala.collection.mutable
+
+import org.apache.spark.executor.ProcfsBasedSystems
 import org.apache.spark.memory.MemoryManager
 
 /**
  * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
  */
 sealed trait ExecutorMetricType {
-  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
-  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+  private[spark] def getMetricValues(memoryManager: MemoryManager): 
Array[Long] = {
+new Array[Long](0)
+  }
+  private[spark] def names: Seq[String] = Seq()
+}
+
+sealed trait SingleValueExecutorMetricType extends ExecutorMetricType {
+  override private[spark] def names = Seq(getClass().getName().
+stripSuffix("$").split("""\.""").last)
+
+  override private[spark] def getMetricValues(memoryManager: 
MemoryManager): Array[Long] = {
+val metrics = new Array[Long](1)
+metrics(0) = getMetricValue(memoryManager)
+metrics
+  }
+
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0
--- End diff --

no default implementation here either


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234342892
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234338058
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
--- End diff --

`pageSize` is only a `var` for testing -- instead just optionally pass it 
in to the constructor

also I think all of these can be `private`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234334582
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -60,11 +60,16 @@ private[spark] class Heartbeater(
   }
 
   /**
-   * Get the current executor level metrics. These are returned as an 
array, with the index
-   * determined by ExecutorMetricType.values
+   * Get the current executor level metrics. These are returned as an array
*/
   def getCurrentMetrics(): ExecutorMetrics = {
-val metrics = 
ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray
+val metrics = new Array[Long](ExecutorMetricType.numMetrics)
+var offset = 0
+ExecutorMetricType.metricGetters.foreach { metric =>
+  val newSetOfMetrics = metric.getMetricValues(memoryManager)
--- End diff --

 nit: lets avoid "set", since order matters, you can just use `newMetrics`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234339667
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
--- End diff --

can be `out` instead of `out2`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234348523
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
--- End diff --

I don't love these names, though I also suck at coming up with good ones.  
I think in particular the "Systems" part is too ambiguous to be useful.  how 
about

`ProcfsBasedSystemsMetrics` -> `ProcfsMetrics`
`ProcfsBasedSystems` -> `ProcfsMetricsGetter`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234342770
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234343203
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234344763
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -19,25 +19,43 @@ package org.apache.spark.metrics
 import java.lang.management.{BufferPoolMXBean, ManagementFactory}
 import javax.management.ObjectName
 
+import scala.collection.mutable
+
+import org.apache.spark.executor.ProcfsBasedSystems
 import org.apache.spark.memory.MemoryManager
 
 /**
  * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
  */
 sealed trait ExecutorMetricType {
-  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
-  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+  private[spark] def getMetricValues(memoryManager: MemoryManager): 
Array[Long] = {
+new Array[Long](0)
+  }
+  private[spark] def names: Seq[String] = Seq()
+}
+
+sealed trait SingleValueExecutorMetricType extends ExecutorMetricType {
+  override private[spark] def names = Seq(getClass().getName().
+stripSuffix("$").split("""\.""").last)
--- End diff --

nit: if the method def is multiline, enclose in braces, and the body should 
start on a newline.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234347011
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -47,18 +65,39 @@ private[spark] abstract class 
MBeanExecutorMetricType(mBeanName: String)
   }
 }
 
-case object JVMHeapMemory extends ExecutorMetricType {
+case object JVMHeapMemory extends SingleValueExecutorMetricType {
   override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
 ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()
   }
 }
 
-case object JVMOffHeapMemory extends ExecutorMetricType {
+case object JVMOffHeapMemory extends SingleValueExecutorMetricType {
   override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
 ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()
   }
 }
 
+case object ProcessTreeMetrics extends ExecutorMetricType {
+  override val names = Seq(
+"ProcessTreeJVMVMemory",
+"ProcessTreeJVMRSSMemory",
+"ProcessTreePythonVMemory",
+"ProcessTreePythonRSSMemory",
+"ProcessTreeOtherVMemory",
+"ProcessTreeOtherRSSMemory")
+  override private[spark] def getMetricValues(memoryManager: 
MemoryManager): Array[Long] = {
--- End diff --

nit: blank line before this method


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234340033
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
--- End diff --

nit: nothing else on line after `=>`, and extra indent for second line of 
log msg


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234337384
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
--- End diff --

delete


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r234342135
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+
+  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private lazy val isProcfsAvailable: Boolean = {
+if (testing) {
+   true
+}
+else {
+  var procDirExists = true
+  try {
+if (!Files.exists(Paths.get(procfsDir))) {
+  procDirExists = false
+}
+  }
+  catch {
+case f: IOException =>
+  logWarning("It seems that procfs isn't supported", f)
+  procDirExists = false
+  }
+  val shouldLogStageExecutorMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+  val shouldLogStageExecutorProcessTreeMetrics =
+SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+  procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+}
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException =>
+logWarning("Exception when trying to compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Set[Int] = {
+if (!isAvailable || testing) {
+  return Set()
+}
+var ptree: Set[Int] = Set()
+ptree += pid
+val queue = mutable.Queue.empt

[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....

2018-11-15 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23041
  
failure from another flaky, SPARK-24153


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....

2018-11-15 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23041
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r233255758
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -19,18 +19,31 @@ package org.apache.spark.metrics
 import java.lang.management.{BufferPoolMXBean, ManagementFactory}
 import javax.management.ObjectName
 
+import scala.collection.mutable
+
+import org.apache.spark.executor.ProcfsBasedSystems
 import org.apache.spark.memory.MemoryManager
 
 /**
  * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
  */
 sealed trait ExecutorMetricType {
-  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
-  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0
+  private[spark] def getMetricSet(memoryManager: MemoryManager): 
Array[Long] = {
+new Array[Long](0)
+  }
+  private[spark] def names = 
Seq(getClass().getName().stripSuffix("$").split("""\.""").last)
--- End diff --

I think I'm suggesting something slightly different.  There would be two 
traits, but they'd be related.   `trait ExecutorMetricType` would allow 
multiple values, as you have it here.  Everything which needed to interact with 
the metrics would use that interface.  That would define a `getMetricsValues()` 
etc. (plural).

Then you'd have another `trait SingleValueMetricType` (or something) which 
would have abstract methods like `getMetricValue()` singular, and it would 
provide a definition for the plural methods like

```scala
def getMetricValues(): Array[Long] = {
  val metrics = new Array[Long](1)
  metrics(0) = getMetricValue()
  metrics
}
```

in java terms, you could think of `ExecutorMetricType` as an interface and 
`SingleValueMetricType` as an abstract class.  `SingleValueMetricType` is just 
a convenience for some implementations to take advantage of to avoid repeating 
code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23021: [SPARK-26032][PYTHON] Break large sql/tests.py files int...

2018-11-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/23021
  
thanks for working on this @HyukjinKwon .  Did you test on python3 as well?

I'm surprised elapsed time is the same -- I would have expected it to be 
faster as the tests could run in parallel.

aside: does anybody have a good recommendation for tools to help with 
viewing a diff like this?  I feel like I want something really simple: a tool 
to show me where each original test got moved to, and highlight what was 
deleted or added.  I tried p4merge and beyond compare, but neither was helpful 
(or I'm using them wrong).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r231921807
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -69,11 +67,10 @@ class ExecutorMetrics private[spark] extends 
Serializable {
*/
   private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
 var updated = false
-
-(0 until ExecutorMetricType.values.length).foreach { idx =>
-   if (executorMetrics.metrics(idx) > metrics(idx)) {
+ExecutorMetricType.definedMetricsAndOffset.map {m =>
--- End diff --

space after `{`.  also a bit clearer if you use pattern matching `.map { 
case (_, idx) =>`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r232224322
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -95,10 +148,18 @@ private[spark] object ExecutorMetricType {
 OnHeapUnifiedMemory,
 OffHeapUnifiedMemory,
 DirectPoolMemory,
-MappedPoolMemory
+MappedPoolMemory,
+ProcessTreeMetrics
   )
 
-  // Map of executor metric type to its index in values.
-  val metricIdxMap =
-Map[ExecutorMetricType, Int](ExecutorMetricType.values.zipWithIndex: 
_*)
+  var definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
+  var numberOfMetrics = 0
--- End diff --

you only need this mutable during initialization, so to convey its 
eventually final value, you can move the mutability to inside a block:

```scala
val (metricToOffset, numMetrics) = {
  var n = 0
  val _metricToOffset = mutable.LinkedHashMap.empty[String, Int]
  ...
   (_metricToOffset, n)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r232221648
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -19,18 +19,31 @@ package org.apache.spark.metrics
 import java.lang.management.{BufferPoolMXBean, ManagementFactory}
 import javax.management.ObjectName
 
+import scala.collection.mutable
+
+import org.apache.spark.executor.ProcfsBasedSystems
 import org.apache.spark.memory.MemoryManager
 
 /**
  * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
  */
 sealed trait ExecutorMetricType {
-  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
-  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0
--- End diff --

this function is unused


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...

2018-11-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22612#discussion_r232220878
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+private[spark] case class ProcfsBasedSystemsMetrics(
+jvmVmemTotal: Long,
+jvmRSSTotal: Long,
+pythonVmemTotal: Long,
+pythonRSSTotal: Long,
+otherVmemTotal: Long,
+otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
+  val procfsStatFile = "stat"
+  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+  var pageSize = computePageSize()
+  var isAvailable: Boolean = isProcfsAvailable
+  private val pid = computePid()
+  private var ptree = mutable.Map[ Int, Set[Int]]()
+
+  var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 
0, 0, 0, 0, 0)
+
+  computeProcessTree()
+
+  private def isProcfsAvailable: Boolean = {
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+val shouldLogStageExecutorMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+val shouldLogStageExecutorProcessTreeMetrics =
+  SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
+  }
+
+  private def computePid(): Int = {
+if (!isAvailable || testing) {
+  return -1;
+}
+try {
+  // This can be simplified in java9:
+  // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+  val cmd = Array("bash", "-c", "echo $PPID")
+  val length = 10
+  val out2 = Utils.executeAndGetOutput(cmd)
+  val pid = Integer.parseInt(out2.split("\n")(0))
+  return pid;
+}
+catch {
+  case e: SparkException => logWarning("Exception when trying to 
compute process tree." +
+" As a result reporting of ProcessTree metrics is stopped", e)
+isAvailable = false
+return -1
+}
+  }
+
+  private def computePageSize(): Long = {
+if (testing) {
+  return 0;
+}
+try {
+  val cmd = Array("getconf", "PAGESIZE")
+  val out2 = Utils.executeAndGetOutput(cmd)
+  return Integer.parseInt(out2.split("\n")(0))
+} catch {
+  case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
+" result reporting of ProcessTree metrics is stopped")
+isAvailable = false
+return 0
+}
+  }
+
+  private def computeProcessTree(): Unit = {
+if (!isAvailable || testing) {
+  return
+}
+ptree = mutable.Map[ Int, Set[Int]]()
+val queue = mutable.Queue.empty[Int]
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPids(p)
+  if(!c.isEmpty) {
   

  1   2   3   4   5   6   7   8   9   10   >