This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d0ec53cf953 [SPARK-41422][UI] Protobuf serializer for 
ExecutorSummaryWrapper
d0ec53cf953 is described below

commit d0ec53cf9531cba6a154b98d4593a0be50e712d8
Author: Sandeep Singh <sand...@techaddict.me>
AuthorDate: Fri Dec 23 00:30:33 2022 -0800

    [SPARK-41422][UI] Protobuf serializer for ExecutorSummaryWrapper
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for ExecutorSummaryWrapper
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for 
ExecutorSummaryWrapper over RocksDB.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UT
    
    Closes #39141 from techaddict/SPARK-41422-ExecutorSummaryWrapper.
    
    Authored-by: Sandeep Singh <sand...@techaddict.me>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  50 ++++++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |   1 +
 .../ExecutorSummaryWrapperSerializer.scala         | 183 +++++++++++++++++++++
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 131 ++++++++++++++-
 4 files changed, 364 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 5949ad63c84..7cf5c2921cb 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -305,3 +305,53 @@ message SpeculationStageSummaryWrapper {
   int32 stage_attempt_id = 2;
   SpeculationStageSummary info = 3;
 }
+
+message MemoryMetrics {
+  int64 used_on_heap_storage_memory = 1;
+  int64 used_off_heap_storage_memory = 2;
+  int64 total_on_heap_storage_memory = 3;
+  int64 total_off_heap_storage_memory = 4;
+}
+
+message ResourceInformation {
+  string name = 1;
+  repeated string addresses = 2;
+}
+
+message ExecutorSummary {
+  string id = 1;
+  string host_port = 2;
+  bool is_active = 3;
+  int32 rdd_blocks = 4;
+  int64 memory_used = 5;
+  int64 disk_used = 6;
+  int32 total_cores = 7;
+  int32 max_tasks = 8;
+  int32 active_tasks = 9;
+  int32 failed_tasks = 10;
+  int32 completed_tasks = 11;
+  int32 total_tasks = 12;
+  int64 total_duration = 13;
+  int64 total_gc_time = 14;
+  int64 total_input_bytes = 15;
+  int64 total_shuffle_read = 16;
+  int64 total_shuffle_write = 17;
+  bool is_blacklisted = 18;
+  int64 max_memory = 19;
+  int64 add_time = 20;
+  optional int64 remove_time = 21;
+  optional string remove_reason = 22;
+  map<string, string> executor_logs = 23;
+  optional MemoryMetrics memory_metrics = 24;
+  repeated int64 blacklisted_in_stages = 25;
+  optional ExecutorMetrics peak_memory_metrics = 26;
+  map<string, string> attributes = 27;
+  map<string, ResourceInformation> resources = 28;
+  int32 resource_profile_id = 29;
+  bool is_excluded = 30;
+  repeated int64 excluded_in_stages = 31;
+}
+
+message ExecutorSummaryWrapper {
+  ExecutorSummary info = 1;
+}
diff --git 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 97c186206bf..b714ea73b36 100644
--- 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -25,3 +25,4 @@ org.apache.spark.status.protobuf.TaskDataWrapperSerializer
 org.apache.spark.status.protobuf.JobDataWrapperSerializer
 org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer
 org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
+org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
new file mode 100644
index 00000000000..03a810157d7
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.Date
+
+import collection.JavaConverters._
+
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.status.ExecutorSummaryWrapper
+import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics}
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+class ExecutorSummaryWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[ExecutorSummaryWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    serialize(input.asInstanceOf[ExecutorSummaryWrapper])
+  }
+
+  def serialize(input: ExecutorSummaryWrapper): Array[Byte] = {
+    val info = serializeExecutorSummary(input.info)
+    val builder = StoreTypes.ExecutorSummaryWrapper.newBuilder()
+      .setInfo(info)
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): ExecutorSummaryWrapper = {
+    val binary = StoreTypes.ExecutorSummaryWrapper.parseFrom(bytes)
+    val info = deserializeExecutorSummary(binary.getInfo)
+    new ExecutorSummaryWrapper(info = info)
+  }
+
+  private def serializeExecutorSummary(
+      input: ExecutorSummary): StoreTypes.ExecutorSummary = {
+    val builder = StoreTypes.ExecutorSummary.newBuilder()
+      .setId(input.id)
+      .setHostPort(input.hostPort)
+      .setIsActive(input.isActive)
+      .setRddBlocks(input.rddBlocks)
+      .setMemoryUsed(input.memoryUsed)
+      .setDiskUsed(input.diskUsed)
+      .setTotalCores(input.totalCores)
+      .setMaxTasks(input.maxTasks)
+      .setActiveTasks(input.activeTasks)
+      .setFailedTasks(input.failedTasks)
+      .setCompletedTasks(input.completedTasks)
+      .setTotalTasks(input.totalTasks)
+      .setTotalDuration(input.totalDuration)
+      .setTotalGcTime(input.totalGCTime)
+      .setTotalInputBytes(input.totalInputBytes)
+      .setTotalShuffleRead(input.totalShuffleRead)
+      .setTotalShuffleWrite(input.totalShuffleWrite)
+      .setIsBlacklisted(input.isBlacklisted)
+      .setMaxMemory(input.maxMemory)
+      .setAddTime(input.addTime.getTime)
+
+    input.removeTime.foreach {
+      date => builder.setRemoveTime(date.getTime)
+    }
+    input.removeReason.foreach(builder.setRemoveReason)
+    input.executorLogs.foreach { case (k, v) =>
+      builder.putExecutorLogs(k, v)
+    }
+    input.memoryMetrics.foreach { metrics =>
+      builder.setMemoryMetrics(serializeMemoryMetrics(metrics))
+    }
+    input.blacklistedInStages.foreach { stage =>
+      builder.addBlacklistedInStages(stage.toLong)
+    }
+    input.peakMemoryMetrics.foreach { metrics =>
+      
builder.setPeakMemoryMetrics(ExecutorMetricsSerializer.serialize(metrics))
+    }
+    input.attributes.foreach { case (k, v) =>
+      builder.putAttributes(k, v)
+    }
+    input.resources.foreach { case (k, v) =>
+      builder.putResources(k, serializeResourceInformation(v))
+    }
+
+    builder.setResourceProfileId(input.resourceProfileId)
+    builder.setIsExcluded(input.isExcluded)
+
+    input.excludedInStages.foreach { stage =>
+      builder.addExcludedInStages(stage.toLong)
+    }
+
+    builder.build()
+  }
+
+  private def deserializeExecutorSummary(
+      binary: StoreTypes.ExecutorSummary): ExecutorSummary = {
+    val peakMemoryMetrics =
+      getOptional(binary.hasPeakMemoryMetrics,
+        () => 
ExecutorMetricsSerializer.deserialize(binary.getPeakMemoryMetrics))
+    val removeTime = getOptional(binary.hasRemoveTime, () => new 
Date(binary.getRemoveTime))
+    val removeReason = getOptional(binary.hasRemoveReason, () => 
binary.getRemoveReason)
+    val memoryMetrics =
+      getOptional(binary.hasMemoryMetrics,
+        () => deserializeMemoryMetrics(binary.getMemoryMetrics))
+    new ExecutorSummary(
+      id = binary.getId,
+      hostPort = binary.getHostPort,
+      isActive = binary.getIsActive,
+      rddBlocks = binary.getRddBlocks,
+      memoryUsed = binary.getMemoryUsed,
+      diskUsed = binary.getDiskUsed,
+      totalCores = binary.getTotalCores,
+      maxTasks = binary.getMaxTasks,
+      activeTasks = binary.getActiveTasks,
+      failedTasks = binary.getFailedTasks,
+      completedTasks = binary.getCompletedTasks,
+      totalTasks = binary.getTotalTasks,
+      totalDuration = binary.getTotalDuration,
+      totalGCTime = binary.getTotalGcTime,
+      totalInputBytes = binary.getTotalInputBytes,
+      totalShuffleRead = binary.getTotalShuffleRead,
+      totalShuffleWrite = binary.getTotalShuffleWrite,
+      isBlacklisted = binary.getIsBlacklisted,
+      maxMemory = binary.getMaxMemory,
+      addTime = new Date(binary.getAddTime),
+      removeTime = removeTime,
+      removeReason = removeReason,
+      executorLogs = binary.getExecutorLogsMap.asScala.toMap,
+      memoryMetrics = memoryMetrics,
+      blacklistedInStages = 
binary.getBlacklistedInStagesList.asScala.map(_.toInt).toSet,
+      peakMemoryMetrics = peakMemoryMetrics,
+      attributes = binary.getAttributesMap.asScala.toMap,
+      resources = 
binary.getResourcesMap.asScala.mapValues(deserializeResourceInformation).toMap,
+      resourceProfileId = binary.getResourceProfileId,
+      isExcluded = binary.getIsExcluded,
+      excludedInStages = 
binary.getExcludedInStagesList.asScala.map(_.toInt).toSet)
+  }
+
+  private def serializeMemoryMetrics(metrics: MemoryMetrics): 
StoreTypes.MemoryMetrics = {
+    val builder = StoreTypes.MemoryMetrics.newBuilder()
+    builder.setUsedOnHeapStorageMemory(metrics.usedOnHeapStorageMemory)
+    builder.setUsedOffHeapStorageMemory(metrics.usedOffHeapStorageMemory)
+    builder.setTotalOnHeapStorageMemory(metrics.totalOnHeapStorageMemory)
+    builder.setTotalOffHeapStorageMemory(metrics.totalOffHeapStorageMemory)
+    builder.build()
+  }
+
+  private def deserializeMemoryMetrics(binary: StoreTypes.MemoryMetrics): 
MemoryMetrics = {
+    new MemoryMetrics(
+      usedOnHeapStorageMemory = binary.getUsedOnHeapStorageMemory,
+      usedOffHeapStorageMemory = binary.getUsedOffHeapStorageMemory,
+      totalOnHeapStorageMemory = binary.getTotalOnHeapStorageMemory,
+      totalOffHeapStorageMemory = binary.getTotalOffHeapStorageMemory
+    )
+  }
+
+  private def serializeResourceInformation(info: ResourceInformation):
+    StoreTypes.ResourceInformation = {
+    val builder = StoreTypes.ResourceInformation.newBuilder()
+    builder.setName(info.name)
+    info.addresses.foreach(builder.addAddresses)
+    builder.build()
+  }
+
+  private def deserializeResourceInformation(binary: 
StoreTypes.ResourceInformation):
+    ResourceInformation = {
+    new ResourceInformation(
+      name = binary.getName,
+      addresses = binary.getAddressesList.asScala.toArray)
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 14f615587b1..22ece38ca0b 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -22,7 +22,7 @@ import java.util.Date
 import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.metrics.ExecutorMetricType
-import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
+import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, TaskResourceRequest}
 import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
 
@@ -617,4 +617,133 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
     assert(result.info.numFailedTasks == input.info.numFailedTasks)
     assert(result.info.numKilledTasks == input.info.numKilledTasks)
   }
+
+  test("Executor Summary") {
+    val memoryMetrics =
+      Some(new MemoryMetrics(
+        usedOnHeapStorageMemory = 15,
+        usedOffHeapStorageMemory = 16,
+        totalOnHeapStorageMemory = 17,
+        totalOffHeapStorageMemory = 18))
+    val peakMemoryMetric =
+      Some(new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 
1024L)))
+    val resources =
+      Map("resource1" -> new ResourceInformation("re1", Array("add1", "add2")))
+    val input = new ExecutorSummaryWrapper(
+      info = new ExecutorSummary(
+        id = "id_1",
+        hostPort = "localhost:7777",
+        isActive = true,
+        rddBlocks = 1,
+        memoryUsed = 64,
+        diskUsed = 128,
+        totalCores = 2,
+        maxTasks = 6,
+        activeTasks = 5,
+        failedTasks = 4,
+        completedTasks = 3,
+        totalTasks = 7,
+        totalDuration = 8,
+        totalGCTime = 9,
+        totalInputBytes = 10,
+        totalShuffleRead = 11,
+        totalShuffleWrite = 12,
+        isBlacklisted = false,
+        maxMemory = 256,
+        addTime = new Date(13),
+        removeTime = Some(new Date(14)),
+        removeReason = Some("reason_1"),
+        executorLogs = Map("log1" -> "logs/log1.log", "log2" -> 
"/log/log2.log"),
+        memoryMetrics = memoryMetrics,
+        blacklistedInStages = Set(19, 20, 21),
+        peakMemoryMetrics = peakMemoryMetric,
+        attributes = Map("attri1" -> "value1", "attri2" -> "val2"),
+        resources = resources,
+        resourceProfileId = 22,
+        isExcluded = true,
+        excludedInStages = Set(23, 24)
+      )
+    )
+
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, classOf[ExecutorSummaryWrapper])
+
+    assert(result.info.id == input.info.id)
+    assert(result.info.hostPort == input.info.hostPort)
+    assert(result.info.isActive == input.info.isActive)
+    assert(result.info.rddBlocks == input.info.rddBlocks)
+    assert(result.info.memoryUsed == input.info.memoryUsed)
+    assert(result.info.diskUsed == input.info.diskUsed)
+    assert(result.info.totalCores == input.info.totalCores)
+    assert(result.info.maxTasks == input.info.maxTasks)
+    assert(result.info.activeTasks == input.info.activeTasks)
+    assert(result.info.failedTasks == input.info.failedTasks)
+    assert(result.info.completedTasks == input.info.completedTasks)
+    assert(result.info.totalTasks == input.info.totalTasks)
+    assert(result.info.totalDuration == input.info.totalDuration)
+    assert(result.info.totalGCTime == input.info.totalGCTime)
+    assert(result.info.totalInputBytes == input.info.totalInputBytes)
+    assert(result.info.totalShuffleRead == input.info.totalShuffleRead)
+    assert(result.info.totalShuffleWrite == input.info.totalShuffleWrite)
+    assert(result.info.isBlacklisted == input.info.isBlacklisted)
+    assert(result.info.maxMemory == input.info.maxMemory)
+    assert(result.info.addTime == input.info.addTime)
+    assert(result.info.removeTime == input.info.removeTime)
+    assert(result.info.removeReason == input.info.removeReason)
+
+    assert(result.info.executorLogs.size == input.info.executorLogs.size)
+    result.info.executorLogs.keys.foreach { k =>
+      assert(input.info.executorLogs.contains(k))
+      assert(result.info.executorLogs(k) == input.info.executorLogs(k))
+    }
+
+    assert(result.info.memoryMetrics.isDefined == 
input.info.memoryMetrics.isDefined)
+    if (result.info.memoryMetrics.isDefined && 
input.info.memoryMetrics.isDefined) {
+      assert(result.info.memoryMetrics.get.usedOnHeapStorageMemory ==
+        input.info.memoryMetrics.get.usedOnHeapStorageMemory)
+      assert(result.info.memoryMetrics.get.usedOffHeapStorageMemory ==
+        input.info.memoryMetrics.get.usedOffHeapStorageMemory)
+      assert(result.info.memoryMetrics.get.totalOnHeapStorageMemory ==
+        input.info.memoryMetrics.get.totalOnHeapStorageMemory)
+      assert(result.info.memoryMetrics.get.totalOffHeapStorageMemory ==
+        input.info.memoryMetrics.get.totalOffHeapStorageMemory)
+    }
+
+    assert(result.info.blacklistedInStages.size == 
input.info.blacklistedInStages.size)
+    result.info.blacklistedInStages.foreach { stage =>
+      assert(input.info.blacklistedInStages.contains(stage))
+    }
+
+    assert(result.info.peakMemoryMetrics.isDefined == 
input.info.peakMemoryMetrics.isDefined)
+    if (result.info.peakMemoryMetrics.isDefined && 
input.info.peakMemoryMetrics.isDefined) {
+      ExecutorMetricType.metricToOffset.foreach { case (name, index) =>
+        result.info.peakMemoryMetrics.get.getMetricValue(name) ==
+          input.info.peakMemoryMetrics.get.getMetricValue(name)
+      }
+    }
+
+    assert(result.info.attributes.size == input.info.attributes.size)
+    result.info.attributes.keys.foreach { k =>
+      assert(input.info.attributes.contains(k))
+      assert(result.info.attributes(k) == input.info.attributes(k))
+    }
+
+    assert(result.info.resources.size == input.info.resources.size)
+    result.info.resources.keys.foreach { k =>
+      assert(input.info.resources.contains(k))
+      assert(result.info.resources(k).name == input.info.resources(k).name)
+      
result.info.resources(k).addresses.zip(input.info.resources(k).addresses).foreach
 {
+        case (a1, a2) =>
+          assert(a1 == a2)
+      }
+    }
+
+    assert(result.info.resourceProfileId == input.info.resourceProfileId)
+    assert(result.info.isExcluded == input.info.isExcluded)
+
+    assert(result.info.excludedInStages.size == 
input.info.excludedInStages.size)
+    result.info.excludedInStages.foreach { stage =>
+      assert(input.info.excludedInStages.contains(stage))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to