[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21221 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r215899049 --- Diff: core/src/main/java/org/apache/spark/SparkFirehoseListener.java --- @@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate( onEvent(executorMetricsUpdate); } + @Override + public final void onStageExecutorMetrics( + SparkListenerStageExecutorMetrics executorMetrics) { --- End diff -- nit: remove extra spaces for better indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r210691276 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -69,6 +69,11 @@ package object config { .bytesConf(ByteUnit.KiB) .createWithDefaultString("100k") + private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS = +ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled") + .booleanConf + .createWithDefault(true) --- End diff -- That would be safer. I'll change to false, and we can change change to true after people have had a chance to test it out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r210690505 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -216,8 +217,7 @@ private[spark] class Executor( def stop(): Unit = { env.metricsSystem.report() -heartbeater.shutdown() -heartbeater.awaitTermination(10, TimeUnit.SECONDS) +heartbeater.stop() --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r210492311 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -216,8 +217,7 @@ private[spark] class Executor( def stop(): Unit = { env.metricsSystem.report() -heartbeater.shutdown() -heartbeater.awaitTermination(10, TimeUnit.SECONDS) +heartbeater.stop() --- End diff -- future: `try {} catch { case NonFatal(e)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r210492513 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -69,6 +69,11 @@ package object config { .bytesConf(ByteUnit.KiB) .createWithDefaultString("100k") + private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS = +ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled") + .booleanConf + .createWithDefault(true) --- End diff -- should this be "false" for now until we could test this out more, just to be on the safe side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209773404 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() --- End diff -- Yup that's fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209772320 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() --- End diff -- The rest of the file is using System.nanoTime() -- it seems more consistent to keep it the same. Clock has getTimeMillis(), although we could always multiply by 1000, not sure if the precision would matter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209771443 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => --- End diff -- No, these don't need to be explicitly defined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770605 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long + private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last +} + +private[spark] abstract class MemoryManagerExecutorMetricType( +f: MemoryManager => Long) extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { +f(memoryManager) + } +} + +private[spark]abstract class MBeanExecutorMetricType(mBeanName: String) --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770476 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770404 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed --- End diff -- That's true -- I'll change the methods to synchronized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209770440 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false + +(0 until ExecutorMetricType.values.length).foreach { idx => + if ( executorMetrics.metrics(idx) > metrics(idx)) { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209717523 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() --- End diff -- Should we use a `Clock` instance here for testing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209715001 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed + + /** + * Off heap execution memory currently in use, in bytes. + */ + final def offHeapExecutionMemoryUsed: Long = offHeapExecutionMemoryPool.memoryUsed --- End diff -- `synchronized` here also and in the below two methods. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209715208 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long + private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last +} + +private[spark] abstract class MemoryManagerExecutorMetricType( +f: MemoryManager => Long) extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { +f(memoryManager) + } +} + +private[spark]abstract class MBeanExecutorMetricType(mBeanName: String) --- End diff -- Put a space after `[spark]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209716819 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => --- End diff -- Does the type need to be explicitly defined here and in the next line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209714796 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false + +(0 until ExecutorMetricType.values.length).foreach { idx => + if ( executorMetrics.metrics(idx) > metrics(idx)) { --- End diff -- Nit: No space after the left bracket. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209714883 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false --- End diff -- No need to specifically label this as `Boolean`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209712159 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed --- End diff -- It probably should be if only because the variable is annotated with `@GuardedBy(this)`, so it makes the code more consistent to mark this as synchronized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209711557 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- Unclear - if we expose these metrics to some external consumer via an API for example, then we almost certainly want to have a schema labelling these fields for consumption by e.g. dashboards. I think what we have here is fine for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723205 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- Is it likely that users would want to access the individual fields, rather than iterating through all? The 1st option would be a bit nicer if so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723188 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends Logging { private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) // A cache for compression codecs to avoid creating the same codec many times - private val codecMap = new mutable.HashMap[String, CompressionCodec] + private val codecMap = new HashMap[String, CompressionCodec] --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723177 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723173 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723165 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => + if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) { + maybeUpdate(exec, now) --- End diff -- Yes, this is called on replay. I've removed the "maybeUpdate". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723141 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,214 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "stageExecutorMetrics-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected StageExecutorMetrics, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = --- End diff -- Moved to after the replay. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723114 --- Diff: core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala --- @@ -217,7 +218,12 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) -val originalEvents = sc.eventLogger.get.loggedEvents +val originalEvents = sc.eventLogger.get.loggedEvents.filter { e => + JsonProtocol.sparkEventFromJson(e) match { --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207723098 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -691,7 +723,19 @@ private[spark] object JsonProtocol { (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) (taskId, stageId, stageAttemptId, updates) } -SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) +val executorUpdates = jsonOption(json \ "Executor Metrics Updated") match { --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722892 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None +} + } +} +/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ +private[spark] class ExecutorMetricsJsonSerializer + extends JsonSerializer[Option[ExecutorMetrics]] { --- End diff -- It doesn't serialize -- nothing is added to the JSON. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722887 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None +} + } +} +/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ +private[spark] class ExecutorMetricsJsonSerializer + extends JsonSerializer[Option[ExecutorMetrics]] { + override def serialize( + metrics: Option[ExecutorMetrics], + jsonGenerator: JsonGenerator, + serializerProvider: SerializerProvider): Unit = { +metrics match { + case Some(m) => --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722865 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE Option(removeReason), executorLogs, memoryMetrics, - blacklistedInStages) + blacklistedInStages, + if (peakExecutorMetrics.isSet()) Some(peakExecutorMetrics) else None) --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722773 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -155,7 +160,14 @@ private[spark] class EventLoggingListener( } // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event) + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +logEvent(event) +if (shouldLogStageExecutorMetrics) { + // record the peak metrics for the new stage + liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), +new HashMap[String, ExecutorMetrics]()) --- End diff -- Modified. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722724 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +95,9 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of (stageId, stageAttempt), to peak executor metrics for the stage + private val liveStageExecutorMetrics = HashMap[(Int, Int), HashMap[String, ExecutorMetrics]]() --- End diff -- Yes, mutable Maps will work, and better to go with the more generic version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722674 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long --- End diff -- Let's stick with the current version for now, and revisit if we end up adding more metric types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207722621 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed --- End diff -- Since it's access just one Long field, I don't think so. The other methods, which are synchronized are summing 2 fields. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207073149 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => --- End diff -- Ok, this can stay as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207037118 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => --- End diff -- yeah, but you're talking about both a `foreach` *and* an `if` together. A long time back we discussed using `option.fold` for this, as it is all in one function, but we rejected it as being pretty confusing for most developers. ```scala scala> def foo(x: Option[String]) = x.fold("nada")("some " + _) foo: (x: Option[String])String scala> foo(None) res0: String = nada scala> foo(Some("blah")) res1: String = some blah ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207016169 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long --- End diff -- Yup, on my end this is a low conviction suggestion - we might start feeling pain around this as we add more metric types, but for a first pass this is probably fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207004345 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None --- End diff -- same here as matt's comments, use `metricsMap.map {}` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207004013 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => + if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) { + maybeUpdate(exec, now) --- End diff -- this is only for replaying from logs, right? but then, `maybeUpdate` will be a no-op, as `live` is false. So, what is the desired effect here? But maybe I'm missing something, because this is covered by the some of the api tests. I think maybe you don't need to call update at all here, and it'll just get written to the kvstore as part of the final `flush` call when the kvstore is closed after reading the whole log. also style nit: indentation is off --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207004094 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { --- End diff -- nit: double indent the `extends` line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r20799 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,214 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "stageExecutorMetrics-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected StageExecutorMetrics, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = --- End diff -- super nit: I'd move these expectations after the events to post, it follows a bit more naturally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207006413 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long --- End diff -- I appreciate the comparison with this alternative, but I don't see a big problem w/ the initial version. If there were many different types for `T` that were really different, then I think I'd be more inclined to do that -- might still come to that as more types are added. (I'm also the one who suggested the current version, so I'm biased) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207004997 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- Yup that's fine - I did some googling, unfortunately there isn't a great way to iterate over fields of a case class. You could create a thin wrapper object around the array instead though, if we really think the nicer API is worthwhile: ``` case class Metrics(values: Seq[Long]) { def someMetric1(): Long = values(0) def def ... } ``` Or even this: ``` case class Metrics(metric1: Long, metric2: Long, metfic3: Long, ...) { def values(): Seq[Long] = Seq(metric1, metric2, metric3, ...) } ``` The latter which would be better because you'd be guaranteed to create the struct with the right number of metrics. Though such abstractions are not necessary by any means. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207002859 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => --- End diff -- From the [Scaladoc](https://www.scala-lang.org/api/2.10.2/index.html#scala.Option): > The most idiomatic way to use an scala.Option instance is to treat it as a collection or monad and use map,flatMap, filter, or foreach It's probably better to follow the Scala conventions here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206998585 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => --- End diff -- I don't think we really make a point about this code style. In general we prefer the functional versions, but I actually find a match on Some & None totally fine. There are plenty of other examples of this in the code base (though that doesn't necessarily mean its the right style ...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206995945 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- I suggested this earlier in the reviews. Most of the operations for dealing with this data want to iterate over all the fields. its much easier this way vs. having a bazillion ```scala if (x.fizz > y.fizz) { y.fizz = x.fizz } if (x.buzz > y.buzz) { y.buzz = x.buzz } ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206335138 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -691,7 +723,19 @@ private[spark] object JsonProtocol { (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) (taskId, stageId, stageAttemptId, updates) } -SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) +val executorUpdates = jsonOption(json \ "Executor Metrics Updated") match { --- End diff -- `Option.map` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206333086 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +95,9 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of (stageId, stageAttempt), to peak executor metrics for the stage + private val liveStageExecutorMetrics = HashMap[(Int, Int), HashMap[String, ExecutorMetrics]]() --- End diff -- Do these have to be `HashMap`s or can they be just `m,utable.Map`s? Can instantiate with `mutable.Map.empty[..., ...]()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206312334 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- Out of curiosity, why are we using an array here with index-based fetching? We could use a struct / case class to represent these metrics. But I suppose the size of the payload we send is smaller if we use an Array, and we don't want to pay serialization costs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206334599 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE Option(removeReason), executorLogs, memoryMetrics, - blacklistedInStages) + blacklistedInStages, + if (peakExecutorMetrics.isSet()) Some(peakExecutorMetrics) else None) --- End diff -- `Some(peakExecutorMetrics).filter(_.isSet)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206333224 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -155,7 +160,14 @@ private[spark] class EventLoggingListener( } // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event) + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { +logEvent(event) +if (shouldLogStageExecutorMetrics) { + // record the peak metrics for the new stage + liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), +new HashMap[String, ExecutorMetrics]()) --- End diff -- `mutable.Map.empty[String, ExecutorMetrics]` if the above signature is changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206334329 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => --- End diff -- Don't case compare with `Some` and `None` - for options prefer the functional equivalents and occasionally can use isEmpty / isPresent. We can do this here: ``` val opt = liveExecutors.get(...).orElse(...) opt.foreach { ...} if (opt.isEmpty) { logWarning(...) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206334970 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None +} + } +} +/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ +private[spark] class ExecutorMetricsJsonSerializer + extends JsonSerializer[Option[ExecutorMetrics]] { --- End diff -- If this is empty does it serialize as `null` or does it not serialize at all? Or does it serialize some other token like `empty`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206312790 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed --- End diff -- Do these have to be `synchronized`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206333488 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends Logging { private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) // A cache for compression codecs to avoid creating the same codec many times - private val codecMap = new mutable.HashMap[String, CompressionCodec] + private val codecMap = new HashMap[String, CompressionCodec] --- End diff -- `mutable.Map.empty` - prefer this everywhere. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206335296 --- Diff: core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala --- @@ -217,7 +218,12 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) -val originalEvents = sc.eventLogger.get.loggedEvents +val originalEvents = sc.eventLogger.get.loggedEvents.filter { e => + JsonProtocol.sparkEventFromJson(e) match { --- End diff -- `isInstanceOf` may be cleaner here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206331772 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long --- End diff -- A few minor flags with this design could be: - When passing the memory manager to the bean-sourced metrics provider, the memory manager ends up being unused - There's an inconsistency with the fact that the metric types that use the memory manager accept the memory manager as an argument, but the metric types that use the memory beans construct the beans as singletons. The following is a proposal that doesn't have the above inconsistencies, but is a bit more complex. Open to discussion on if the extra complexity is worthwhile here. ``` sealed trait ExecutorMetricType[T] { private[spark] def getMetricValue(src: T): Long private[spark] val name = ... } case class MBeanMetricType(mbeanName: String) extends ExecutorMetricType[BufferPoolMXBean] { override def name(): String = // derive some name from the bean name, most likely override def getMetricValue(bean: BufferPoolMXBean): Long = { bean.getMemoryUsed } } case object OnHeapExecutionMemory extends ExecutorMetricType[MemoryManager] { override def name(): String = "OnHeapExecution" override def getMetricValue(memoryManager: MemoryManager): Long = memoryManager. onHeapExecutionMemoryUsed } private[spark] object ExecutorMetricType { val memoryManagerMetrics = IndexedSeq( OnHeapExecutionMemory, OffHeapExecutionMemory, // Add all subtypes of ExecutorMetricType[MemoryManager] here. ) private val BUFFER_POOL_BEAN_NAME = "java.nio:type=BufferPool,name=direct" private val MAPPED_POOL_BEAN_NAME = "java.nio:type=BufferPool,name=mapped" val mbeanMetrics = IndexedSeq( (MBeanMetricType(BUFFER_POOL_BEAN_NAME), getBean(BUFFER_POOL_BEAN_NAME), MBeanMetricType(MAPPED_POOL_BEAN_NAME), getBean(MAPPED_POOL_BEAN_NAME)) val values = memoryManagerMetrics ++ mBeanMetrics.map(_._1) // Just for length? There might be a simpler way def getMetricsSummary(memoryManager: MemoryManager): ExecutorMetrics = { val allMetrics = new Array[Long](values.length) memoryManagerMetrics.zipWithIndex.forEach { case (index, metric) => allMetrics(index) = metrics.getMetricValue(memoryManager) mbeanMetrics.zipWithIndex.foreach { case (index, (metric, bean)) => allMetrics(index + memoryManagerMetrics.length) = metric.getMetricValue(bean) return new ExecutorMetrics(allMetrics) } private def getBean(beanName: String): BufferPoolMxBean = { ManagementFactory.newPlatformMXBeanProxy( ManagementFactory.getPlatformMBeanServer, new ObjectName(beanName).toString, classOf[BufferPoolMXBean]) } } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r206334704 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +103,50 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer]) +@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) +val peakMemoryMetrics: Option[ExecutorMetrics]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */ +private[spark] class ExecutorMetricsJsonDeserializer + extends JsonDeserializer[Option[ExecutorMetrics]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[ExecutorMetrics] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(new ExecutorMetrics(metrics)) + case None => None +} + } +} +/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */ +private[spark] class ExecutorMetricsJsonSerializer + extends JsonSerializer[Option[ExecutorMetrics]] { + override def serialize( + metrics: Option[ExecutorMetrics], + jsonGenerator: JsonGenerator, + serializerProvider: SerializerProvider): Unit = { +metrics match { + case Some(m) => --- End diff -- Don't use `Some` and `None` case matching - prefer `foreach`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r205095575 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "stageExecutorMetrics-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected StageExecutorMetrics, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = + Map( +((0, "1"), + new SparkListenerStageExecutorMetrics("1", 0, 0, + Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + new SparkListenerStageExecutorMetrics("2", 0, 0, + Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + new SparkListenerStageExecutorMetrics("1", 1, 0, + Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + new SparkListenerStageExecutorMetrics("2", 1, 0, + Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + // receive 3 metric updates from each executor with just stage 0 running, + // with different peak updates for each executor + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6 + createExecutorMetricsUpdateEvent(2, + Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 5, 7 + createExecutorMetricsUpdateEvent(1, + Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8 + createExecutorMetricsUpdateEvent(2, + Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + // now start stage 1, one more metric update for each executor, and new + // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks + createStageSubmittedEvent(1), + // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7 --- End diff -- Stage 0 is still running, and these are new peaks for that stage. It is also initializing all the stage 1 metric values, since these are the first executor metrics seen for stage 1 (I'll add this to the comments). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r204976606 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "stageExecutorMetrics-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected StageExecutorMetrics, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = + Map( +((0, "1"), + new SparkListenerStageExecutorMetrics("1", 0, 0, + Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), +((0, "2"), + new SparkListenerStageExecutorMetrics("2", 0, 0, + Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), +((1, "1"), + new SparkListenerStageExecutorMetrics("1", 1, 0, + Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), +((1, "2"), + new SparkListenerStageExecutorMetrics("2", 1, 0, + Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L + +// Events to post. +val events = Array( + SparkListenerApplicationStart("executionMetrics", None, +1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + // receive 3 metric updates from each executor with just stage 0 running, + // with different peak updates for each executor + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6 + createExecutorMetricsUpdateEvent(2, + Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 5, 7 + createExecutorMetricsUpdateEvent(1, + Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8 + createExecutorMetricsUpdateEvent(2, + Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + // now start stage 1, one more metric update for each executor, and new + // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks + createStageSubmittedEvent(1), + // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7 --- End diff -- Are this comment and the one in line 322 correct? Shouldn't it say stage 1? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203520320 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- e ... actually just realized EnumMap is always mutable. If we want the public api to be totally immutable, back to private field & constructor, and a getter. so might as well keep the field an `Array[Long]` since its private anyway. IMO doesn't really matter if `ExecutorMetricType` is an enum or a sealed scala trait. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203503691 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- This sounds like a good solution, with both a clean API, and also an efficient implementation that will be easier to add new metrics to. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203489913 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- I am completely sold on the idea of enum-like. My main concern was around avoiding `MatchError`'s in scala and the other potential failures you elaborated on above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203455379 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- re: compatability -- the json already uses names, so is not dependent on enum order. we always require the same code for the values sent between the driver & executor, so that isn't a concern. For the user accessing these values with `getMetricValue()` -- deleting an enum would be the same as deleting a field, so yeah it would break compatibility. technically allowed for a "developerapi" but something we should avoid. Adding enums should be OK. If the enums are re-ordered in the spark code, but the user compiles against an older version ... I *think* it should be OK, as we'd look up the index in the actual spark code at runtime. btw, I'm using "enum" loosely, I don't actually mean a java enum, I mean more the general concept, as its implemented in the current diff. A fixed set of constants, with a helper to get all of them in order. We could switch to using java enums -- I actually started that (https://github.com/apache/spark/pull/21221/commits/8b74ba8fff21b499e7cc9d93f9864831aa29773e), but changed it in the end. honestly I forget why -- I think it was because I decided I wanted scala scoping constructs and more flexible code organization or something along those lines, and java's enum didn't really buy me much more. The `executorMetrics` field here is basically an `EnumMap`, but it can actually do primitives. That matters more for the internal messages, than here in the public spark listener api. anyway, I think there *does* need to be some developer api which exposes all of the MetricGetter/ExecutorMetricType values. I don't really care whether that is a java enum, or the home-grown version here. I'm flexible on specifics, but my suggestion: an `ExecutorMetricType` enum that is a developerapi; make the SparkListener api expose the values as `executorMetrics: EnumMap`; internally, still use `Array[Long]`, and have `MetricGetter` contain the code which knows how to take an executorMetricType and determine its current value. that would make the public api more self-documenting and clean, keep things efficient and clean internally, and also allow us to separate out the public list of metrics from the internal logic to determine current values, without having to play too many games with package-private access. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203319952 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- +1 on enum's @squito ! The only concern would be evolving the enum's in a later release - changing enum could result in source incompatibility. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203122722 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- Adding getMetricValue() would abstract away the array implementation. Should MetricGetter/ExecutorMetricType be public? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203100978 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- sorry was out for a bit, catching up now -- so I think these are valid concerns, but I'd really like to have an approach which avoids all the getFooBar() methods -- its more error prone internally with all the places you've got to make sure you match things up exactly, and even for external consumers its nice to have an easy way to iterate over everything. I think we can satisfy all concerns by making the field & constructor private and adding a `getMetricValue(metric: MetricGetter)`. perhaps we should also rename `MetricGetter` -- externally its more like `ExecutorMetricType`. Users shouldn't know anything about its ability to grab the value of the metric in a running executor. I'm not totally stuck on this, but I'd certainly appreciate it -- I'm very partial to enums, but realize not everyone sees the beauty of them like I do :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r200826235 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- We can change back to using an ExecutorMetrics class in this case. The plan was for any new metrics to be added to the end, so that there wouldn't be any change in ordering, and executorMetrics could be changed to immutable Seq[Long], but there would still be the issue of having to reference MetricGetter to find out how the metrics are indexed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r200805499 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- We cannot expose an array of long's in a developer api (mutability). In addition, we cannot have users needing to reference private spark api's to understand the meaning of it - particularly when the ordering can be subject to change in subsequent versions of spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198815695 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { --- End diff -- With ExecutorMetrics removed, it seems useful to have a class for tracking and setting peak metric values, that can be used by both EventLoggingListener and AppStatusListener. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198684121 --- Diff: project/MimaExcludes.scala --- @@ -89,7 +89,13 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.getValidationIndicatorCol"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.org$apache$spark$ml$param$shared$HasValidationIndicatorCol$_setter_$validationIndicatorCol_="), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), + +// [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$") --- End diff -- Will move up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198683846 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,217 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics --- End diff -- Woops, that was left over from when it was ExecutorMetricsUpdated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198683408 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +102,48 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer private[spark] extends JsonDeserializer[Option[Array[Long]]] { --- End diff -- This is odd, but I can't seem to comment on your earlier comment. Regarding having a serializer/deserializer, I also don't have strong feelings -- it makes it more readable, but also takes up more space in the history log. Regarding this comment, thanks, I hadn't realized the placement meant that it marked the constructor. It's meant for the class, and I'll move. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682917 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -264,6 +282,11 @@ private[spark] trait SparkListenerInterface { */ def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit + /** + * Called when the driver reads stage executor metrics from the history log. --- End diff -- Updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682980 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,29 @@ private[spark] class AppStatusListener( } } } +event.executorUpdates.foreach { updates: Array[Long] => + // check if there is a new peak value for any of the executor level memory metrics + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdate(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics --- End diff -- Unfortunately, yes. I've added some comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682809 --- Diff: core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +sealed trait MetricGetter { --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682884 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +181,28 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { --- End diff -- Yes, the naming is confusing. Changed to the 1st option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682779 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryManager +import org.apache.spark.metrics.MetricGetter +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Creates a heartbeat thread which will call the specified reportHeartbeat function at + * intervals of intervalMs. + * + * @param memoryManager the memory manager for execution and storage memory. + * @param reportHeartbeat the heartbeat reporting function to call. + * @param name the thread name for the heartbeater. + * @param intervalMs the interval between heartbeats. + */ +private[spark] class Heartbeater( +memoryManager: MemoryManager, +reportHeartbeat: () => Unit, +name: String, +intervalMs: Long) extends Logging { + // Executor for the heartbeat task + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name) + + /** Schedules a task to report a heartbeat. */ + private[spark] def start(): Unit = { --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198682787 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1922,6 +1928,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } +if(_heartbeater != null) { --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198611581 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +181,28 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { --- End diff -- couple of minor things -- I wouldn't use `executorMap` for the `Option` and then `executorEntry` for the Map. Also, we tend to prefer using `foreach` over scala's `for` loop, the one exception is that it can clean up extra nesting when you've got a bunch of loops (which you actually do here). So I'd go with ```scala val executorOpt = liveStageExecutorMetrics.remove( (event.stageInfo.stageId, event.stageInfo.attemptNumber())) executorOpt.foreach { execMap => execMap.foreach { case (executorId, peakExecutorMetrics) => ... } } ``` or if you want to use the for loop, use it around everything: ```scala val execOpt = liveStageExecutorMetrics.remove( (event.stageInfo.stageId, event.stageInfo.attemptNumber())) for { execMap <- execOpt (executorId, peakExecutorMetrics) <- execMap } { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198628058 --- Diff: project/MimaExcludes.scala --- @@ -89,7 +89,13 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.getValidationIndicatorCol"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.org$apache$spark$ml$param$shared$HasValidationIndicatorCol$_setter_$validationIndicatorCol_="), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), + +// [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$") --- End diff -- minor: I hear we put new ones at the top now (so you don't have to add a comma to the previous one) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198618559 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,29 @@ private[spark] class AppStatusListener( } } } +event.executorUpdates.foreach { updates: Array[Long] => + // check if there is a new peak value for any of the executor level memory metrics + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdate(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics --- End diff -- do you need this here *and* in `onExecutorMetricsUpdate`? I guess you do, because one is for reading from the logs, and the other is for the live UI? If so its worth putting in a comment about that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198625661 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,217 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics --- End diff -- i think you mean *executor* metrics, not *task* metrics, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198624068 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, Object]]]) +metricsMap match { + case Some(metrics) => +Some(MetricGetter.values.map { m => + metrics.getOrElse (m.name, 0L) match { +case intVal: Int => intVal.toLong +case longVal: Long => longVal + } +}.toArray) + case None => None +} + } +} + +/** serializer for peakMemoryMetrics: convert array to map with metric name as key */ +class PeakMemoryMetricsSerializer extends JsonSerializer[Option[Array[Long]]] { + override def serialize( + metrics: Option[Array[Long]], + jsonGenerator: JsonGenerator, + serializerProvider: SerializerProvider): Unit = { +metrics match { + case Some(m) => +val metricsMap = (0 until MetricGetter.values.length).map { idx => --- End diff -- I don't really feel strongly. If you don't find it helpful here, then we might as well get rid of it, its just a convenience meant for cases like this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198610162 --- Diff: core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +sealed trait MetricGetter { --- End diff -- `private[spark]` on all of these classes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198624819 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +102,48 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer private[spark] extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]]( + new TypeReference[Option[Map[String, java.lang.Long]]] {}) +metricsMap match { + case Some(metrics) => +Some(MetricGetter.values.map(m => metrics.getOrElse(m.name, 0L)).toArray) + case None => None +} + } +} +/** serializer for peakMemoryMetrics: convert array to map with metric name as key */ +class PeakMemoryMetricsSerializer private[spark] extends JsonSerializer[Option[Array[Long]]] { --- End diff -- same here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198613341 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -264,6 +282,11 @@ private[spark] trait SparkListenerInterface { */ def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit + /** + * Called when the driver reads stage executor metrics from the history log. --- End diff -- Called with the peak memory metrics for a given (executor, stage) combination. Note that this is only present when reading from the event log (as in the history server), and is never called in a live application. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198609741 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryManager +import org.apache.spark.metrics.MetricGetter +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Creates a heartbeat thread which will call the specified reportHeartbeat function at + * intervals of intervalMs. + * + * @param memoryManager the memory manager for execution and storage memory. + * @param reportHeartbeat the heartbeat reporting function to call. + * @param name the thread name for the heartbeater. + * @param intervalMs the interval between heartbeats. + */ +private[spark] class Heartbeater( +memoryManager: MemoryManager, +reportHeartbeat: () => Unit, +name: String, +intervalMs: Long) extends Logging { + // Executor for the heartbeat task + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name) + + /** Schedules a task to report a heartbeat. */ + private[spark] def start(): Unit = { --- End diff -- nit: whole class is `private[spark]` so you don't need to add this to individual methods --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198624800 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +102,48 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer private[spark] extends JsonDeserializer[Option[Array[Long]]] { --- End diff -- `private[spark] class ...` I'm not sure what `private[spark]` does in the place you have it -- it might mark the constructor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198611872 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r198609800 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1922,6 +1928,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } +if(_heartbeater != null) { --- End diff -- nit: space before ( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r196236364 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- Just discussed with @squito -- thanks! Logging -1 for timestamp is confusing and hacky. Some items discussed: For ExecutorMetrics, timestamp can be optional, or it can be removed completely and replaced by Array[Long], with comments explaining how the metrics work. For logging, stage ID could be added as part of the executorMetrics in SparkListenerExecutorMetricsUpdate, but this is awkward, since this information isn't used as part of the Heartbeat, and only for logging. While the application is running, there could be multiple stages running when the metrics are gathered, so specifying 1 stage ID doesn't make sense. For logging, the metrics are the peak values for a particular stage, so are associated with a stage. Another option is to add the information to SparkListenerStageCompleted, but this would bloat the event if there are many executors. A third option is to create a new event, SparkListenerStageExecutorMetrics, which would have the executor ID, stage ID and attempt, and peak metrics. I'll give the 3rd option a try, and will add details to the design doc once this is more finalized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r19695 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- there is no point in logging the timestamp, if we only log -1. Better to just remove it. Are you doing anything with it in the live UI? Or should we just get rid of the timestamp field entirely? (it can always be added later if there is a use for it.) I agree that having one timestamp for the peak across all metrics isn't very useful. Its possible *all* the metrics would hit a peak at nearly the same time, but the more metrics we add, the less likely that becomes. And even if we had the timestamp for each metric, what would we do with it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195957438 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- The last timestamp seems like it wouldn't have enough information, since peaks for different metrics could occur at different times, and with different combinations of stages running. Only -1 would be logged. Right now it's writing out SparkListenerExecutorMetricsUpdate events, which contain ExecutorMetrics, which has timestamp. Do you think timestamp should be removed from ExecutorMetrics? It seems good to have the timestamp for when the metrics were gathered, but it's not being exposed at this point. For both the history server and the live UI, the goal is to show the peak value for each metric each executor. For the executors tab, this is the peak value of each metric over the lifetime of the executor. For the stages tab, this is the peak value for each metric for that executor while the stage is running. The executor could be processing tasks for other stages as well, if there are concurrent stages, or no tasks for this stage if it isn't assigned any tasks, but it is the peak values between the time the stage starts and ends. Can you describe how the stage level metrics would work the last timestamp for any peak metric? Would there be a check to see if the event is being read from the history log? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195955081 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- I can see how this would work, but it also seems far more confusing than necessary. My understanding was that you'd always log the last timestamp which replaced the peak value for *any* metric. Are you ever logging something other than -1 for the timestamp? If not, we just shouldn't put any timestamp in the log. It might be helpful to step back a bit , and rather than focusing on the mechanics of what you're doing now, discuss the desired end behavior in the history server and the live UI based on the timestamp. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195892287 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,222 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { +val conf = getLoggingConf(testDirPath, None) +val logName = "executorMetricsUpdated-test" +val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) +val listenerBus = new LiveListenerBus(conf) + +// expected ExecutorMetricsUpdate, for the given stage id and executor id +val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = --- End diff -- I'll add a check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195892271 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { --- End diff -- Yes, changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195892263 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, Object]]]) --- End diff -- That works and is much cleaner, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org