Github user mccheah commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r206331772
--- Diff:
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+ private[spark] def getMetricValue(memoryManager: MemoryManager): Long
--- End diff --
A few minor flags with this design could be:
- When passing the memory manager to the bean-sourced metrics provider, the
memory manager ends up being unused
- There's an inconsistency with the fact that the metric types that use the
memory manager accept the memory manager as an argument, but the metric types
that use the memory beans construct the beans as singletons.
The following is a proposal that doesn't have the above inconsistencies,
but is a bit more complex. Open to discussion on if the extra complexity is
worthwhile here.
```
sealed trait ExecutorMetricType[T] {
private[spark] def getMetricValue(src: T): Long
private[spark] val name = ...
}
case class MBeanMetricType(mbeanName: String) extends
ExecutorMetricType[BufferPoolMXBean] {
override def name(): String = // derive some name from the bean name,
most likely
override def getMetricValue(bean: BufferPoolMXBean): Long = {
bean.getMemoryUsed
}
}
case object OnHeapExecutionMemory extends ExecutorMetricType[MemoryManager]
{
override def name(): String = "OnHeapExecution"
override def getMetricValue(memoryManager: MemoryManager): Long =
memoryManager. onHeapExecutionMemoryUsed
}
private[spark] object ExecutorMetricType {
val memoryManagerMetrics = IndexedSeq(
OnHeapExecutionMemory,
OffHeapExecutionMemory,
// Add all subtypes of ExecutorMetricType[MemoryManager] here.
)
private val BUFFER_POOL_BEAN_NAME = "java.nio:type=BufferPool,name=direct"
private val MAPPED_POOL_BEAN_NAME = "java.nio:type=BufferPool,name=mapped"
val mbeanMetrics = IndexedSeq(
(MBeanMetricType(BUFFER_POOL_BEAN_NAME),
getBean(BUFFER_POOL_BEAN_NAME),
MBeanMetricType(MAPPED_POOL_BEAN_NAME),
getBean(MAPPED_POOL_BEAN_NAME))
val values = memoryManagerMetrics ++ mBeanMetrics.map(_._1) // Just for
length? There might be a simpler way
def getMetricsSummary(memoryManager: MemoryManager): ExecutorMetrics = {
val allMetrics = new Array[Long](values.length)
memoryManagerMetrics.zipWithIndex.forEach { case (index, metric) =>
allMetrics(index) = metrics.getMetricValue(memoryManager)
mbeanMetrics.zipWithIndex.foreach { case (index, (metric, bean)) =>
allMetrics(index + memoryManagerMetrics.length) = metric.getMetricValue(bean)
return new ExecutorMetrics(allMetrics)
}
private def getBean(beanName: String): BufferPoolMxBean = {
ManagementFactory.newPlatformMXBeanProxy(
ManagementFactory.getPlatformMBeanServer,
new ObjectName(beanName).toString,
classOf[BufferPoolMXBean])
}
}
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]