[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-09-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21221


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-09-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r215899049
  
--- Diff: core/src/main/java/org/apache/spark/SparkFirehoseListener.java ---
@@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate(
 onEvent(executorMetricsUpdate);
   }
 
+  @Override
+  public final void onStageExecutorMetrics(
+  SparkListenerStageExecutorMetrics executorMetrics) {
--- End diff --

nit: remove extra spaces for better indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-16 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r210691276
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -69,6 +69,11 @@ package object config {
 .bytesConf(ByteUnit.KiB)
 .createWithDefaultString("100k")
 
+  private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
+ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

That would be safer. I'll change to false, and we can change change to true 
after people have had a chance to test it out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-16 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r210690505
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -216,8 +217,7 @@ private[spark] class Executor(
 
   def stop(): Unit = {
 env.metricsSystem.report()
-heartbeater.shutdown()
-heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+heartbeater.stop()
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-16 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r210492311
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -216,8 +217,7 @@ private[spark] class Executor(
 
   def stop(): Unit = {
 env.metricsSystem.report()
-heartbeater.shutdown()
-heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+heartbeater.stop()
--- End diff --

future: `try {} catch {  case NonFatal(e)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-16 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r210492513
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -69,6 +69,11 @@ package object config {
 .bytesConf(ByteUnit.KiB)
 .createWithDefaultString("100k")
 
+  private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
+ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

should this be "false" for now until we could test this out more, just to 
be on the safe side?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209773404
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
--- End diff --

Yup that's fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209772320
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
--- End diff --

The rest of the file is using System.nanoTime() -- it seems more consistent 
to keep it the same. Clock has getTimeMillis(), although we could always 
multiply by 1000, not sure if the precision would matter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209771443
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
--- End diff --

No, these don't need to be explicitly defined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770605
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
+  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+}
+
+private[spark] abstract class MemoryManagerExecutorMetricType(
+f: MemoryManager => Long) extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
+f(memoryManager)
+  }
+}
+
+private[spark]abstract class MBeanExecutorMetricType(mBeanName: String)
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770476
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770404
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
--- End diff --

That's true -- I'll change the methods to synchronized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209770440
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+   if ( executorMetrics.metrics(idx) > metrics(idx)) {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209717523
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
--- End diff --

Should we use a `Clock` instance here for testing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209715001
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
+
+  /**
+   *  Off heap execution memory currently in use, in bytes.
+   */
+  final def offHeapExecutionMemoryUsed: Long = 
offHeapExecutionMemoryPool.memoryUsed
--- End diff --

`synchronized` here also and in the below two methods.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209715208
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
+  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+}
+
+private[spark] abstract class MemoryManagerExecutorMetricType(
+f: MemoryManager => Long) extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
+f(memoryManager)
+  }
+}
+
+private[spark]abstract class MBeanExecutorMetricType(mBeanName: String)
--- End diff --

Put a space after `[spark]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209716819
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
--- End diff --

Does the type need to be explicitly defined here and in the next line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209714796
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+   if ( executorMetrics.metrics(idx) > metrics(idx)) {
--- End diff --

Nit: No space after the left bracket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209714883
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
--- End diff --

No need to specifically label this as `Boolean`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209712159
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
--- End diff --

It probably should be if only because the variable is annotated with 
`@GuardedBy(this)`, so it makes the code more consistent to mark this as 
synchronized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209711557
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

Unclear - if we expose these metrics to some external consumer via an API 
for example, then we almost certainly want to have a schema labelling these 
fields for consumption by e.g. dashboards. I think what we have here is fine 
for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723205
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

Is it likely that users would want to access the individual fields, rather 
than iterating through all? The 1st option would be a bit nicer if so. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723188
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends 
Logging {
   private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
 
   // A cache for compression codecs to avoid creating the same codec many 
times
-  private val codecMap = new mutable.HashMap[String, CompressionCodec]
+  private val codecMap = new HashMap[String, CompressionCodec]
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723177
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723173
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723165
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
+ if 
(exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics))
 {
+  maybeUpdate(exec, now)
--- End diff --

Yes, this is called on replay. I've removed the "maybeUpdate".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723141
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,214 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test stage executor metrics logging functionality. This checks that 
peak
+   * values from SparkListenerExecutorMetricsUpdate events during a stage 
are
+   * logged in a StageExecutorMetrics event for each executor at stage 
completion.
+   */
+  private def testStageExecutorMetricsEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "stageExecutorMetrics-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected StageExecutorMetrics, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerStageExecutorMetrics] =
--- End diff --

Moved to after the replay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723114
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala ---
@@ -217,7 +218,12 @@ class ReplayListenerSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSp
 
 // Verify the same events are replayed in the same order
 assert(sc.eventLogger.isDefined)
-val originalEvents = sc.eventLogger.get.loggedEvents
+val originalEvents = sc.eventLogger.get.loggedEvents.filter { e =>
+  JsonProtocol.sparkEventFromJson(e) match {
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207723098
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -691,7 +723,19 @@ private[spark] object JsonProtocol {
 (json \ "Accumulator 
Updates").extract[List[JValue]].map(accumulableInfoFromJson)
   (taskId, stageId, stageAttemptId, updates)
 }
-SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
+val executorUpdates = jsonOption(json \ "Executor Metrics Updated") 
match {
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722892
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
+}
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with 
metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+  extends JsonSerializer[Option[ExecutorMetrics]] {
--- End diff --

It doesn't serialize -- nothing is added to the JSON.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722887
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
+}
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with 
metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+  extends JsonSerializer[Option[ExecutorMetrics]] {
+  override def serialize(
+  metrics: Option[ExecutorMetrics],
+  jsonGenerator: JsonGenerator,
+  serializerProvider: SerializerProvider): Unit = {
+metrics match {
+  case Some(m) =>
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722865
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
   Option(removeReason),
   executorLogs,
   memoryMetrics,
-  blacklistedInStages)
+  blacklistedInStages,
+  if (peakExecutorMetrics.isSet()) Some(peakExecutorMetrics) else None)
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722773
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -155,7 +160,14 @@ private[spark] class EventLoggingListener(
   }
 
   // Events that do not trigger a flush
-  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= logEvent(event)
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+logEvent(event)
+if (shouldLogStageExecutorMetrics) {
+  // record the peak metrics for the new stage
+  liveStageExecutorMetrics.put((event.stageInfo.stageId, 
event.stageInfo.attemptNumber()),
+new HashMap[String, ExecutorMetrics]())
--- End diff --

Modified.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722724
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +95,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of (stageId, stageAttempt), to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = HashMap[(Int, Int), 
HashMap[String, ExecutorMetrics]]()
--- End diff --

Yes, mutable Maps will work, and better to go with the more generic version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722674
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
--- End diff --

Let's stick with the current version for now, and revisit if we end up 
adding more metric types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-04 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207722621
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
--- End diff --

Since it's access just one Long field, I don't think so. The other methods, 
which are synchronized are summing 2 fields.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207073149
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
--- End diff --

Ok, this can stay as is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207037118
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
--- End diff --

yeah, but you're talking about both a `foreach` *and* an `if` together.

A long time back we discussed using `option.fold` for this, as it is all in 
one function, but we rejected it as being pretty confusing for most developers.

```scala
scala> def foo(x: Option[String]) = x.fold("nada")("some " + _)
foo: (x: Option[String])String

scala> foo(None)
res0: String = nada

scala> foo(Some("blah"))
res1: String = some blah
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207016169
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
--- End diff --

Yup, on my end this is a low conviction suggestion - we might start feeling 
pain around this as we add more metric types, but for a first pass this is 
probably fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207004345
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
--- End diff --

same here as matt's comments, use `metricsMap.map {}`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207004013
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
+ if 
(exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics))
 {
+  maybeUpdate(exec, now)
--- End diff --

this is only for replaying from logs, right?  but then, `maybeUpdate` will 
be a no-op, as `live` is false.  So, what is the desired effect here?  But 
maybe I'm missing something, because this is covered by the some of the api 
tests.  I think maybe you don't need to call update at all here, and it'll just 
get written to the kvstore as part of the final `flush` call when the kvstore 
is closed after reading the whole log.

also style nit: indentation is off


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207004094
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
--- End diff --

nit: double indent the `extends` line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r20799
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,214 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test stage executor metrics logging functionality. This checks that 
peak
+   * values from SparkListenerExecutorMetricsUpdate events during a stage 
are
+   * logged in a StageExecutorMetrics event for each executor at stage 
completion.
+   */
+  private def testStageExecutorMetricsEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "stageExecutorMetrics-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected StageExecutorMetrics, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerStageExecutorMetrics] =
--- End diff --

super nit: I'd move these expectations after the events to post, it follows 
a bit more naturally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207006413
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
--- End diff --

I appreciate the comparison with this alternative, but I don't see a big 
problem w/ the initial version.  If there were many different types for `T` 
that were really different, then I think I'd be more inclined to do that -- 
might still come to that as more types are added.

(I'm also the one who suggested the current version, so I'm biased)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207004997
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

Yup that's fine - I did some googling, unfortunately there isn't a great 
way to iterate over fields of a case class. You could create a thin wrapper 
object around the array instead though, if we really think the nicer API is 
worthwhile:

```
case class Metrics(values: Seq[Long]) {
  def someMetric1(): Long = values(0)
  def 
  def ...
}
```

Or even this:

```
case class Metrics(metric1: Long, metric2: Long, metfic3: Long, ...) {
  def values(): Seq[Long] = Seq(metric1, metric2, metric3, ...)
}
```

The latter which would be better because you'd be guaranteed to create the 
struct with the right number of metrics. Though such abstractions are not 
necessary by any means.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207002859
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
--- End diff --

From the 
[Scaladoc](https://www.scala-lang.org/api/2.10.2/index.html#scala.Option):

> The most idiomatic way to use an scala.Option instance is to treat it as 
a collection or monad and use map,flatMap, filter, or foreach

It's probably better to follow the Scala conventions here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206998585
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
--- End diff --

I don't think we really make a point about this code style.  In general we 
prefer the functional versions, but I actually find a match on Some & None 
totally fine.  There are plenty of other examples of this in the code base 
(though that doesn't necessarily mean its the right style ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206995945
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

I suggested this earlier in the reviews.  Most of the operations for 
dealing with this data want to iterate over all the fields.  its much easier 
this way vs. having a bazillion

```scala
if (x.fizz > y.fizz) { 
  y.fizz = x.fizz
}
if (x.buzz > y.buzz) {
  y.buzz = x.buzz
}
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206335138
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -691,7 +723,19 @@ private[spark] object JsonProtocol {
 (json \ "Accumulator 
Updates").extract[List[JValue]].map(accumulableInfoFromJson)
   (taskId, stageId, stageAttemptId, updates)
 }
-SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
+val executorUpdates = jsonOption(json \ "Executor Metrics Updated") 
match {
--- End diff --

`Option.map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206333086
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +95,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of (stageId, stageAttempt), to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = HashMap[(Int, Int), 
HashMap[String, ExecutorMetrics]]()
--- End diff --

Do these have to be `HashMap`s or can they be just `m,utable.Map`s? Can 
instantiate with `mutable.Map.empty[..., ...]()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206312334
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

Out of curiosity, why are we using an array here with index-based fetching? 
We could use a struct / case class to represent these metrics. But I suppose 
the size of the payload we send is smaller if we use an Array, and we don't 
want to pay serialization costs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206334599
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
   Option(removeReason),
   executorLogs,
   memoryMetrics,
-  blacklistedInStages)
+  blacklistedInStages,
+  if (peakExecutorMetrics.isSet()) Some(peakExecutorMetrics) else None)
--- End diff --

`Some(peakExecutorMetrics).filter(_.isSet)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206333224
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -155,7 +160,14 @@ private[spark] class EventLoggingListener(
   }
 
   // Events that do not trigger a flush
-  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= logEvent(event)
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
+logEvent(event)
+if (shouldLogStageExecutorMetrics) {
+  // record the peak metrics for the new stage
+  liveStageExecutorMetrics.put((event.stageInfo.stageId, 
event.stageInfo.attemptNumber()),
+new HashMap[String, ExecutorMetrics]())
--- End diff --

`mutable.Map.empty[String, ExecutorMetrics]` if the above signature is 
changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206334329
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
--- End diff --

Don't case compare with `Some` and `None` - for options prefer the 
functional equivalents and occasionally can use isEmpty / isPresent. We can do 
this here:

```
val opt = liveExecutors.get(...).orElse(...)
opt.foreach { ...}
if (opt.isEmpty) {
  logWarning(...)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206334970
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
+}
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with 
metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+  extends JsonSerializer[Option[ExecutorMetrics]] {
--- End diff --

If this is empty does it serialize as `null` or does it not serialize at 
all? Or does it serialize some other token like `empty`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206312790
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
--- End diff --

Do these have to be `synchronized`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206333488
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends 
Logging {
   private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
 
   // A cache for compression codecs to avoid creating the same codec many 
times
-  private val codecMap = new mutable.HashMap[String, CompressionCodec]
+  private val codecMap = new HashMap[String, CompressionCodec]
--- End diff --

`mutable.Map.empty` - prefer this everywhere.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206335296
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala ---
@@ -217,7 +218,12 @@ class ReplayListenerSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSp
 
 // Verify the same events are replayed in the same order
 assert(sc.eventLogger.isDefined)
-val originalEvents = sc.eventLogger.get.loggedEvents
+val originalEvents = sc.eventLogger.get.loggedEvents.filter { e =>
+  JsonProtocol.sparkEventFromJson(e) match {
--- End diff --

`isInstanceOf` may be cleaner here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206331772
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
--- End diff --

A few minor flags with this design could be:
- When passing the memory manager to the bean-sourced metrics provider, the 
memory manager ends up being unused
- There's an inconsistency with the fact that the metric types that use the 
memory manager accept the memory manager as an argument, but the metric types 
that use the memory beans construct the beans as singletons.

The following is a proposal that doesn't have the above inconsistencies, 
but is a bit more complex. Open to discussion on if the extra complexity is 
worthwhile here.

```
sealed trait ExecutorMetricType[T] {
  private[spark] def getMetricValue(src: T): Long
  private[spark] val name = ...
}

case class MBeanMetricType(mbeanName: String) extends 
ExecutorMetricType[BufferPoolMXBean] {
  override def name(): String = // derive some name from the bean name, 
most likely
  override def getMetricValue(bean: BufferPoolMXBean): Long = {
bean.getMemoryUsed
  }
}

case object OnHeapExecutionMemory extends ExecutorMetricType[MemoryManager] 
{
  override def name(): String = "OnHeapExecution"
  override def getMetricValue(memoryManager: MemoryManager): Long = 
memoryManager. onHeapExecutionMemoryUsed
}

private[spark] object ExecutorMetricType {
  val memoryManagerMetrics = IndexedSeq(
  OnHeapExecutionMemory,
  OffHeapExecutionMemory,
  // Add all subtypes of ExecutorMetricType[MemoryManager] here.
)
  private val BUFFER_POOL_BEAN_NAME = "java.nio:type=BufferPool,name=direct"
  private val MAPPED_POOL_BEAN_NAME = "java.nio:type=BufferPool,name=mapped"
  val mbeanMetrics = IndexedSeq(
(MBeanMetricType(BUFFER_POOL_BEAN_NAME), 
getBean(BUFFER_POOL_BEAN_NAME),
MBeanMetricType(MAPPED_POOL_BEAN_NAME), 
getBean(MAPPED_POOL_BEAN_NAME))

  val values = memoryManagerMetrics ++ mBeanMetrics.map(_._1) // Just for 
length? There might be a simpler way

  def getMetricsSummary(memoryManager: MemoryManager): ExecutorMetrics = {
val allMetrics = new Array[Long](values.length)
memoryManagerMetrics.zipWithIndex.forEach { case (index, metric) => 
allMetrics(index) = metrics.getMetricValue(memoryManager)

mbeanMetrics.zipWithIndex.foreach { case (index, (metric, bean)) => 
allMetrics(index + memoryManagerMetrics.length) = metric.getMetricValue(bean)
return new ExecutorMetrics(allMetrics)
  }

  private def getBean(beanName: String): BufferPoolMxBean = {
ManagementFactory.newPlatformMXBeanProxy(
  ManagementFactory.getPlatformMBeanServer,
  new ObjectName(beanName).toString,
  classOf[BufferPoolMXBean])
  }
}


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r206334704
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +103,50 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+  extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): 
Option[ExecutorMetrics] = {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(new ExecutorMetrics(metrics))
+  case None => None
+}
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with 
metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+  extends JsonSerializer[Option[ExecutorMetrics]] {
+  override def serialize(
+  metrics: Option[ExecutorMetrics],
+  jsonGenerator: JsonGenerator,
+  serializerProvider: SerializerProvider): Unit = {
+metrics match {
+  case Some(m) =>
--- End diff --

Don't use `Some` and `None` case matching - prefer `foreach`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-25 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r205095575
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test stage executor metrics logging functionality. This checks that 
peak
+   * values from SparkListenerExecutorMetricsUpdate events during a stage 
are
+   * logged in a StageExecutorMetrics event for each executor at stage 
completion.
+   */
+  private def testStageExecutorMetricsEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "stageExecutorMetrics-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected StageExecutorMetrics, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerStageExecutorMetrics] =
+  Map(
+((0, "1"),
+  new SparkListenerStageExecutorMetrics("1", 0, 0,
+  Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))),
+((0, "2"),
+  new SparkListenerStageExecutorMetrics("2", 0, 0,
+  Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))),
+((1, "1"),
+  new SparkListenerStageExecutorMetrics("1", 1, 0,
+  Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))),
+((1, "2"),
+  new SparkListenerStageExecutorMetrics("2", 1, 0,
+  Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  // receive 3 metric updates from each executor with just stage 0 
running,
+  // with different peak updates for each executor
+  createExecutorMetricsUpdateEvent(1,
+  Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+  Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)),
+  // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
+  createExecutorMetricsUpdateEvent(1,
+  Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)),
+  // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
+  createExecutorMetricsUpdateEvent(2,
+  Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)),
+  // exec 1: new stage 0 peaks for metrics at indexes: 5, 7
+  createExecutorMetricsUpdateEvent(1,
+  Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)),
+  // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
+  createExecutorMetricsUpdateEvent(2,
+  Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)),
+  // now start stage 1, one more metric update for each executor, and 
new
+  // peaks for some stage 1 metrics (as listed), initialize stage 1 
peaks
+  createStageSubmittedEvent(1),
+  // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7
--- End diff --

Stage 0 is still running, and these are new peaks for that stage. It is 
also initializing all the stage 1 metric values, since these are the first 
executor metrics seen for stage 1 (I'll add this to the comments).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-24 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r204976606
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test stage executor metrics logging functionality. This checks that 
peak
+   * values from SparkListenerExecutorMetricsUpdate events during a stage 
are
+   * logged in a StageExecutorMetrics event for each executor at stage 
completion.
+   */
+  private def testStageExecutorMetricsEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "stageExecutorMetrics-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected StageExecutorMetrics, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerStageExecutorMetrics] =
+  Map(
+((0, "1"),
+  new SparkListenerStageExecutorMetrics("1", 0, 0,
+  Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))),
+((0, "2"),
+  new SparkListenerStageExecutorMetrics("2", 0, 0,
+  Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))),
+((1, "1"),
+  new SparkListenerStageExecutorMetrics("1", 1, 0,
+  Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))),
+((1, "2"),
+  new SparkListenerStageExecutorMetrics("2", 1, 0,
+  Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L
+
+// Events to post.
+val events = Array(
+  SparkListenerApplicationStart("executionMetrics", None,
+1L, "update", None),
+  createExecutorAddedEvent(1),
+  createExecutorAddedEvent(2),
+  createStageSubmittedEvent(0),
+  // receive 3 metric updates from each executor with just stage 0 
running,
+  // with different peak updates for each executor
+  createExecutorMetricsUpdateEvent(1,
+  Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)),
+  createExecutorMetricsUpdateEvent(2,
+  Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)),
+  // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
+  createExecutorMetricsUpdateEvent(1,
+  Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)),
+  // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
+  createExecutorMetricsUpdateEvent(2,
+  Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)),
+  // exec 1: new stage 0 peaks for metrics at indexes: 5, 7
+  createExecutorMetricsUpdateEvent(1,
+  Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)),
+  // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
+  createExecutorMetricsUpdateEvent(2,
+  Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)),
+  // now start stage 1, one more metric update for each executor, and 
new
+  // peaks for some stage 1 metrics (as listed), initialize stage 1 
peaks
+  createStageSubmittedEvent(1),
+  // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7
--- End diff --

Are this comment and the one in line 322 correct? Shouldn't it say stage 1?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203520320
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

e ... actually just realized EnumMap is always mutable.  If we want the 
public api to be totally immutable, back to private field & constructor, and a 
getter.  so might as well keep the field an `Array[Long]` since its private 
anyway.  IMO doesn't really matter if `ExecutorMetricType` is an enum or a 
sealed scala trait.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203503691
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

This sounds like a good solution, with both a clean API, and also an 
efficient implementation that will be easier to add new metrics to. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203489913
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

I am completely sold on the idea of enum-like.
My main concern was around avoiding `MatchError`'s in scala and the other 
potential failures you elaborated on above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203455379
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

re: compatability -- the json already uses names, so is not dependent on 
enum order.  we always require the same code for the values sent between the 
driver & executor, so that isn't a concern.  For the user accessing these 
values with `getMetricValue()` -- deleting an enum would be the same as 
deleting a field, so yeah it would break compatibility.  technically allowed 
for a "developerapi" but something we should avoid.  Adding enums should be OK. 
 If the enums are re-ordered in the spark code, but the user compiles against 
an older version ... I *think* it should be OK, as we'd look up the index in 
the actual spark code at runtime.

btw, I'm using "enum" loosely, I don't actually mean a java enum, I mean 
more the general concept, as its implemented in the current diff.  A fixed set 
of constants, with a helper to get all of them in order.  We could switch to 
using java enums -- I actually started that 
(https://github.com/apache/spark/pull/21221/commits/8b74ba8fff21b499e7cc9d93f9864831aa29773e),
 but changed it in the end.  honestly I forget why -- I think it was because I 
decided I wanted scala scoping constructs and more flexible code organization 
or something along those lines, and java's enum didn't really buy me much more. 
 The `executorMetrics` field here is basically an `EnumMap`, but it can 
actually do primitives.  That matters more for the internal messages, than here 
in the public spark listener api.

anyway, I think there *does* need to be some developer api which exposes 
all of the MetricGetter/ExecutorMetricType values.  I don't really care whether 
that is a java enum, or the home-grown version here.  I'm flexible on 
specifics, but my suggestion: an `ExecutorMetricType` enum that is a 
developerapi; make the SparkListener api expose the values as `executorMetrics: 
EnumMap`; internally, still use `Array[Long]`, and 
have `MetricGetter` contain the code which knows how to take an 
executorMetricType and determine its current value.  that would make the public 
api more self-documenting and clean, keep things efficient and clean 
internally, and also allow us to separate out the public list of metrics from 
the internal logic to determine current values, without having to play too many 
games with package-private access.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203319952
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

+1 on enum's @squito !
The only concern would be evolving the enum's in a later release - changing 
enum could result in source incompatibility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-17 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203122722
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

Adding getMetricValue() would abstract away the array implementation. 
Should MetricGetter/ExecutorMetricType be public?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203100978
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

sorry was out for a bit, catching up now -- 

so I think these are valid concerns, but I'd really like to have an 
approach which avoids all the getFooBar() methods -- its more error prone 
internally with all the places you've got to make sure you match things up 
exactly, and even for external consumers its nice to have an easy way to 
iterate over everything.  I think we can satisfy all concerns by making the 
field & constructor private and adding a `getMetricValue(metric: 
MetricGetter)`.  perhaps we should also rename `MetricGetter` -- externally its 
more like `ExecutorMetricType`.  Users shouldn't know anything about its 
ability to grab the value of the metric in a running executor.

I'm not totally stuck on this, but I'd certainly appreciate it -- I'm very 
partial to enums, but realize not everyone sees the beauty of them like I do :) 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-07 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r200826235
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

We can change back to using an ExecutorMetrics class in this case.

The plan was for any new metrics to be added to the end, so that there 
wouldn't be any change in ordering, and executorMetrics could be changed to 
immutable Seq[Long], but there would still be the issue of having to reference 
MetricGetter to find out how the metrics are indexed. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r200805499
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

We cannot expose an array of long's in a developer api (mutability).
In addition, we cannot have users needing to reference private spark api's 
to understand the meaning of it - particularly when the ordering can be subject 
to change in subsequent versions of spark.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-28 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198815695
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
--- End diff --

With ExecutorMetrics removed, it seems useful to have a class for tracking 
and setting peak metric values, that can be used by both EventLoggingListener 
and AppStatusListener.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198684121
  
--- Diff: project/MimaExcludes.scala ---
@@ -89,7 +89,13 @@ object MimaExcludes {
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.getValidationIndicatorCol"),
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.org$apache$spark$ml$param$shared$HasValidationIndicatorCol$_setter_$validationIndicatorCol_="),
-
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol")
+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),
+
+// [SPARK-23429][CORE] Add executor memory metrics to heartbeat and 
expose in executors REST API
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$")
--- End diff --

Will move up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198683846
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,217 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
--- End diff --

Woops, that was left over from when it was ExecutorMetricsUpdated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198683408
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +102,48 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer private[spark] extends 
JsonDeserializer[Option[Array[Long]]] {
--- End diff --

This is odd, but I can't seem to comment on your earlier comment. Regarding 
having a serializer/deserializer, I also don't have strong feelings -- it makes 
it more readable, but also takes up more space in the history log.

Regarding this comment, thanks, I hadn't realized the placement meant that 
it marked the constructor. It's meant for the class, and I'll move.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682917
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -264,6 +282,11 @@ private[spark] trait SparkListenerInterface {
*/
   def onExecutorMetricsUpdate(executorMetricsUpdate: 
SparkListenerExecutorMetricsUpdate): Unit
 
+  /**
+   * Called when the driver reads stage executor metrics from the history 
log.
--- End diff --

Updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682980
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,29 @@ private[spark] class AppStatusListener(
 }
   }
 }
+event.executorUpdates.foreach { updates: Array[Long] =>
+  // check if there is a new peak value for any of the executor level 
memory metrics
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdate(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics
--- End diff --

Unfortunately, yes. I've added some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682809
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala 
---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+sealed trait MetricGetter {
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682884
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +181,28 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
--- End diff --

Yes, the naming is confusing. Changed to the 1st option.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682779
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryManager
+import org.apache.spark.metrics.MetricGetter
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified 
reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param memoryManager the memory manager for execution and storage 
memory.
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param name the thread name for the heartbeater.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(
+memoryManager: MemoryManager,
+reportHeartbeat: () => Unit,
+name: String,
+intervalMs: Long) extends Logging {
+  // Executor for the heartbeat task
+  private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)
+
+  /** Schedules a task to report a heartbeat. */
+  private[spark] def start(): Unit = {
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198682787
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1922,6 +1928,12 @@ class SparkContext(config: SparkConf) extends 
Logging {
 Utils.tryLogNonFatalError {
   _eventLogger.foreach(_.stop())
 }
+if(_heartbeater != null) {
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198611581
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +181,28 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
--- End diff --

couple of minor things -- I wouldn't use `executorMap` for the `Option` and 
then `executorEntry` for the Map.  Also, we tend to prefer using `foreach` over 
scala's `for` loop, the one exception is that it can clean up extra nesting 
when you've got a bunch of loops (which you actually do here).

So I'd go with

```scala
val executorOpt = liveStageExecutorMetrics.remove(
  (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
executorOpt.foreach { execMap =>
  execMap.foreach { case (executorId, peakExecutorMetrics) =>
...
  }
}
```

or if you want to use the for loop, use it around everything:

```scala
val execOpt = liveStageExecutorMetrics.remove(
(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
for {
  execMap <- execOpt
  (executorId, peakExecutorMetrics) <- execMap
} {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198628058
  
--- Diff: project/MimaExcludes.scala ---
@@ -89,7 +89,13 @@ object MimaExcludes {
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.getValidationIndicatorCol"),
 
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.org$apache$spark$ml$param$shared$HasValidationIndicatorCol$_setter_$validationIndicatorCol_="),
-
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol")
+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"),
+
+// [SPARK-23429][CORE] Add executor memory metrics to heartbeat and 
expose in executors REST API
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$")
--- End diff --

minor: I hear we put new ones at the top now (so you don't have to add a 
comma to the previous one)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198618559
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,29 @@ private[spark] class AppStatusListener(
 }
   }
 }
+event.executorUpdates.foreach { updates: Array[Long] =>
+  // check if there is a new peak value for any of the executor level 
memory metrics
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdate(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics
--- End diff --

do you need this here *and* in `onExecutorMetricsUpdate`?  I guess you do, 
because one is for reading from the logs, and the other is for the live UI?  If 
so its worth putting in a comment about that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198625661
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,217 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
--- End diff --

i think you mean *executor* metrics, not *task* metrics, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198624068
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, 
Object]]])
+metricsMap match {
+  case Some(metrics) =>
+Some(MetricGetter.values.map { m =>
+  metrics.getOrElse (m.name, 0L) match {
+case intVal: Int => intVal.toLong
+case longVal: Long => longVal
+  }
+}.toArray)
+  case None => None
+}
+  }
+}
+
+/** serializer for peakMemoryMetrics: convert array to map with metric 
name as key */
+class PeakMemoryMetricsSerializer extends 
JsonSerializer[Option[Array[Long]]] {
+  override def serialize(
+  metrics: Option[Array[Long]],
+  jsonGenerator: JsonGenerator,
+  serializerProvider: SerializerProvider): Unit = {
+metrics match {
+  case Some(m) =>
+val metricsMap = (0 until MetricGetter.values.length).map { idx =>
--- End diff --

I don't really feel strongly.  If you don't find it helpful here, then we 
might as well get rid of it, its just a convenience meant for cases like this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198610162
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala 
---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+sealed trait MetricGetter {
--- End diff --

`private[spark]` on all of these classes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198624819
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +102,48 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer private[spark] extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+  new TypeReference[Option[Map[String, java.lang.Long]]] {})
+metricsMap match {
+  case Some(metrics) =>
+Some(MetricGetter.values.map(m => metrics.getOrElse(m.name, 
0L)).toArray)
+  case None => None
+}
+  }
+}
+/** serializer for peakMemoryMetrics: convert array to map with metric 
name as key */
+class PeakMemoryMetricsSerializer private[spark] extends 
JsonSerializer[Option[Array[Long]]] {
--- End diff --

same here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198613341
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -264,6 +282,11 @@ private[spark] trait SparkListenerInterface {
*/
   def onExecutorMetricsUpdate(executorMetricsUpdate: 
SparkListenerExecutorMetricsUpdate): Unit
 
+  /**
+   * Called when the driver reads stage executor metrics from the history 
log.
--- End diff --

Called with the peak memory metrics for a given (executor, stage) 
combination.  Note that this is only present when reading from the event log 
(as in the history server), and is never called in a live application.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198609741
  
--- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryManager
+import org.apache.spark.metrics.MetricGetter
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified 
reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param memoryManager the memory manager for execution and storage 
memory.
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param name the thread name for the heartbeater.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(
+memoryManager: MemoryManager,
+reportHeartbeat: () => Unit,
+name: String,
+intervalMs: Long) extends Logging {
+  // Executor for the heartbeat task
+  private val heartbeater = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)
+
+  /** Schedules a task to report a heartbeat. */
+  private[spark] def start(): Unit = {
--- End diff --

nit: whole class is `private[spark]` so you don't need to add this to 
individual methods


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198624800
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +102,48 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer private[spark] extends 
JsonDeserializer[Option[Array[Long]]] {
--- End diff --

`private[spark] class ...`

I'm not sure what `private[spark]` does in the place you have it -- it 
might mark the constructor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198611872
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r198609800
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1922,6 +1928,12 @@ class SparkContext(config: SparkConf) extends 
Logging {
 Utils.tryLogNonFatalError {
   _eventLogger.foreach(_.stop())
 }
+if(_heartbeater != null) {
--- End diff --

nit: space before (


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-18 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r196236364
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

Just discussed with @squito -- thanks! Logging -1 for timestamp is 
confusing and hacky. 

Some items discussed:

For ExecutorMetrics, timestamp can be optional, or it can be removed 
completely and replaced by Array[Long], with comments explaining how the 
metrics work.

For logging, stage ID could be added as part of the executorMetrics in 
SparkListenerExecutorMetricsUpdate, but this is awkward, since this information 
isn't used as part of the Heartbeat, and only for logging. While the 
application is running, there could be multiple stages running when the metrics 
are gathered, so specifying 1 stage ID doesn't make sense. For logging, the 
metrics are the peak values for a particular stage, so are associated with a 
stage.

Another option is to add the information to SparkListenerStageCompleted, 
but this would bloat the event if there are many executors.

A third option is to create a new event, SparkListenerStageExecutorMetrics, 
which would have the executor ID, stage ID and attempt, and peak metrics. 

I'll give the 3rd option a try, and will add details to the design doc once 
this is more finalized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r19695
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

there is no point in logging the timestamp, if we only log -1.  Better to 
just remove it.  Are you doing anything with it in the live UI?  Or should we 
just get rid of the timestamp field entirely?  (it can always be added later if 
there is a use for it.)

I agree that having one timestamp for the peak across all metrics isn't 
very useful.  Its possible *all* the metrics would hit a peak at nearly the 
same time, but the more metrics we add, the less likely that becomes.  And even 
if we had the timestamp for each metric, what would we do with it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-17 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195957438
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

The last timestamp seems like it wouldn't have enough information, since 
peaks for different metrics could occur at different times, and with different 
combinations of stages running. 

Only -1 would be logged. Right now it's writing out 
SparkListenerExecutorMetricsUpdate events, which contain ExecutorMetrics, which 
has timestamp. Do you think timestamp should be removed from ExecutorMetrics? 
It seems good to have the timestamp for when the metrics were gathered, but 
it's not being exposed at this point.

For both the history server and the live UI, the goal is to show the peak 
value for each metric each executor. For the executors tab, this is the peak 
value of each metric over the lifetime of the executor. For the stages tab, 
this is the peak value for each metric for that executor while the stage is 
running. The executor could be processing tasks for other stages as well, if 
there are concurrent stages, or no tasks for this stage if it isn't assigned 
any tasks, but it is the peak values between the time the stage starts and ends.

Can you describe how the stage level metrics would work the last timestamp 
for any peak metric? Would there be a check to see if the event is being read 
from the history log?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195955081
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

I can see how this would work, but it also seems far more confusing than 
necessary.  My understanding was that you'd always log the last timestamp which 
replaced the peak value for *any* metric.  Are you ever logging something other 
than -1 for the timestamp?  If not, we just shouldn't put any timestamp in the 
log.

It might be helpful to step back a bit , and rather than focusing on the 
mechanics of what you're doing now, discuss the desired end behavior in the 
history server and the live UI based on the timestamp.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195892287
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
@@ -251,6 +261,222 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
 }
   }
 
+  /**
+   * Test executor metrics update logging functionality. This checks that a
+   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+   * log if one of the executor metrics is larger than any previously
+   * recorded value for the metric, per executor per stage. The task 
metrics
+   * should not be added.
+   */
+  private def testExecutorMetricsUpdateEventLogging() {
+val conf = getLoggingConf(testDirPath, None)
+val logName = "executorMetricsUpdated-test"
+val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+val listenerBus = new LiveListenerBus(conf)
+
+// expected ExecutorMetricsUpdate, for the given stage id and executor 
id
+val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
--- End diff --

I'll add a check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195892271
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
--- End diff --

Yes, changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-15 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195892263
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, 
Object]]])
--- End diff --

That works and is much cleaner, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >