Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r198624819
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +102,48 @@ class ExecutorSummary private[spark](
val removeReason: Option[String],
val executorLogs: Map[String, String],
val memoryMetrics: Option[MemoryMetrics],
- val blacklistedInStages: Set[Int])
+ val blacklistedInStages: Set[Int],
+ @JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+ @JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+ val peakMemoryMetrics: Option[Array[Long]])
class MemoryMetrics private[spark](
val usedOnHeapStorageMemory: Long,
val usedOffHeapStorageMemory: Long,
val totalOnHeapStorageMemory: Long,
val totalOffHeapStorageMemory: Long)
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric
name */
+class PeakMemoryMetricsDeserializer private[spark] extends
JsonDeserializer[Option[Array[Long]]] {
+ override def deserialize(
+ jsonParser: JsonParser,
+ deserializationContext: DeserializationContext): Option[Array[Long]]
= {
+ val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+ new TypeReference[Option[Map[String, java.lang.Long]]] {})
+ metricsMap match {
+ case Some(metrics) =>
+ Some(MetricGetter.values.map(m => metrics.getOrElse(m.name,
0L)).toArray)
+ case None => None
+ }
+ }
+}
+/** serializer for peakMemoryMetrics: convert array to map with metric
name as key */
+class PeakMemoryMetricsSerializer private[spark] extends
JsonSerializer[Option[Array[Long]]] {
--- End diff --
same here
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]