This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d0ec53cf953 [SPARK-41422][UI] Protobuf serializer for ExecutorSummaryWrapper d0ec53cf953 is described below commit d0ec53cf9531cba6a154b98d4593a0be50e712d8 Author: Sandeep Singh <sand...@techaddict.me> AuthorDate: Fri Dec 23 00:30:33 2022 -0800 [SPARK-41422][UI] Protobuf serializer for ExecutorSummaryWrapper ### What changes were proposed in this pull request? Add Protobuf serializer for ExecutorSummaryWrapper ### Why are the changes needed? Support fast and compact serialization/deserialization for ExecutorSummaryWrapper over RocksDB. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UT Closes #39141 from techaddict/SPARK-41422-ExecutorSummaryWrapper. Authored-by: Sandeep Singh <sand...@techaddict.me> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 50 ++++++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../ExecutorSummaryWrapperSerializer.scala | 183 +++++++++++++++++++++ .../protobuf/KVStoreProtobufSerializerSuite.scala | 131 ++++++++++++++- 4 files changed, 364 insertions(+), 1 deletion(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 5949ad63c84..7cf5c2921cb 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -305,3 +305,53 @@ message SpeculationStageSummaryWrapper { int32 stage_attempt_id = 2; SpeculationStageSummary info = 3; } + +message MemoryMetrics { + int64 used_on_heap_storage_memory = 1; + int64 used_off_heap_storage_memory = 2; + int64 total_on_heap_storage_memory = 3; + int64 total_off_heap_storage_memory = 4; +} + +message ResourceInformation { + string name = 1; + repeated string addresses = 2; +} + +message ExecutorSummary { + string id = 1; + string host_port = 2; + bool is_active = 3; + int32 rdd_blocks = 4; + int64 memory_used = 5; + int64 disk_used = 6; + int32 total_cores = 7; + int32 max_tasks = 8; + int32 active_tasks = 9; + int32 failed_tasks = 10; + int32 completed_tasks = 11; + int32 total_tasks = 12; + int64 total_duration = 13; + int64 total_gc_time = 14; + int64 total_input_bytes = 15; + int64 total_shuffle_read = 16; + int64 total_shuffle_write = 17; + bool is_blacklisted = 18; + int64 max_memory = 19; + int64 add_time = 20; + optional int64 remove_time = 21; + optional string remove_reason = 22; + map<string, string> executor_logs = 23; + optional MemoryMetrics memory_metrics = 24; + repeated int64 blacklisted_in_stages = 25; + optional ExecutorMetrics peak_memory_metrics = 26; + map<string, string> attributes = 27; + map<string, ResourceInformation> resources = 28; + int32 resource_profile_id = 29; + bool is_excluded = 30; + repeated int64 excluded_in_stages = 31; +} + +message ExecutorSummaryWrapper { + ExecutorSummary info = 1; +} diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 97c186206bf..b714ea73b36 100644 --- a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -25,3 +25,4 @@ org.apache.spark.status.protobuf.TaskDataWrapperSerializer org.apache.spark.status.protobuf.JobDataWrapperSerializer org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer +org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala new file mode 100644 index 00000000000..03a810157d7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import java.util.Date + +import collection.JavaConverters._ + +import org.apache.spark.resource.ResourceInformation +import org.apache.spark.status.ExecutorSummaryWrapper +import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics} +import org.apache.spark.status.protobuf.Utils.getOptional + +class ExecutorSummaryWrapperSerializer extends ProtobufSerDe { + + override val supportClass: Class[_] = classOf[ExecutorSummaryWrapper] + + override def serialize(input: Any): Array[Byte] = { + serialize(input.asInstanceOf[ExecutorSummaryWrapper]) + } + + def serialize(input: ExecutorSummaryWrapper): Array[Byte] = { + val info = serializeExecutorSummary(input.info) + val builder = StoreTypes.ExecutorSummaryWrapper.newBuilder() + .setInfo(info) + builder.build().toByteArray + } + + def deserialize(bytes: Array[Byte]): ExecutorSummaryWrapper = { + val binary = StoreTypes.ExecutorSummaryWrapper.parseFrom(bytes) + val info = deserializeExecutorSummary(binary.getInfo) + new ExecutorSummaryWrapper(info = info) + } + + private def serializeExecutorSummary( + input: ExecutorSummary): StoreTypes.ExecutorSummary = { + val builder = StoreTypes.ExecutorSummary.newBuilder() + .setId(input.id) + .setHostPort(input.hostPort) + .setIsActive(input.isActive) + .setRddBlocks(input.rddBlocks) + .setMemoryUsed(input.memoryUsed) + .setDiskUsed(input.diskUsed) + .setTotalCores(input.totalCores) + .setMaxTasks(input.maxTasks) + .setActiveTasks(input.activeTasks) + .setFailedTasks(input.failedTasks) + .setCompletedTasks(input.completedTasks) + .setTotalTasks(input.totalTasks) + .setTotalDuration(input.totalDuration) + .setTotalGcTime(input.totalGCTime) + .setTotalInputBytes(input.totalInputBytes) + .setTotalShuffleRead(input.totalShuffleRead) + .setTotalShuffleWrite(input.totalShuffleWrite) + .setIsBlacklisted(input.isBlacklisted) + .setMaxMemory(input.maxMemory) + .setAddTime(input.addTime.getTime) + + input.removeTime.foreach { + date => builder.setRemoveTime(date.getTime) + } + input.removeReason.foreach(builder.setRemoveReason) + input.executorLogs.foreach { case (k, v) => + builder.putExecutorLogs(k, v) + } + input.memoryMetrics.foreach { metrics => + builder.setMemoryMetrics(serializeMemoryMetrics(metrics)) + } + input.blacklistedInStages.foreach { stage => + builder.addBlacklistedInStages(stage.toLong) + } + input.peakMemoryMetrics.foreach { metrics => + builder.setPeakMemoryMetrics(ExecutorMetricsSerializer.serialize(metrics)) + } + input.attributes.foreach { case (k, v) => + builder.putAttributes(k, v) + } + input.resources.foreach { case (k, v) => + builder.putResources(k, serializeResourceInformation(v)) + } + + builder.setResourceProfileId(input.resourceProfileId) + builder.setIsExcluded(input.isExcluded) + + input.excludedInStages.foreach { stage => + builder.addExcludedInStages(stage.toLong) + } + + builder.build() + } + + private def deserializeExecutorSummary( + binary: StoreTypes.ExecutorSummary): ExecutorSummary = { + val peakMemoryMetrics = + getOptional(binary.hasPeakMemoryMetrics, + () => ExecutorMetricsSerializer.deserialize(binary.getPeakMemoryMetrics)) + val removeTime = getOptional(binary.hasRemoveTime, () => new Date(binary.getRemoveTime)) + val removeReason = getOptional(binary.hasRemoveReason, () => binary.getRemoveReason) + val memoryMetrics = + getOptional(binary.hasMemoryMetrics, + () => deserializeMemoryMetrics(binary.getMemoryMetrics)) + new ExecutorSummary( + id = binary.getId, + hostPort = binary.getHostPort, + isActive = binary.getIsActive, + rddBlocks = binary.getRddBlocks, + memoryUsed = binary.getMemoryUsed, + diskUsed = binary.getDiskUsed, + totalCores = binary.getTotalCores, + maxTasks = binary.getMaxTasks, + activeTasks = binary.getActiveTasks, + failedTasks = binary.getFailedTasks, + completedTasks = binary.getCompletedTasks, + totalTasks = binary.getTotalTasks, + totalDuration = binary.getTotalDuration, + totalGCTime = binary.getTotalGcTime, + totalInputBytes = binary.getTotalInputBytes, + totalShuffleRead = binary.getTotalShuffleRead, + totalShuffleWrite = binary.getTotalShuffleWrite, + isBlacklisted = binary.getIsBlacklisted, + maxMemory = binary.getMaxMemory, + addTime = new Date(binary.getAddTime), + removeTime = removeTime, + removeReason = removeReason, + executorLogs = binary.getExecutorLogsMap.asScala.toMap, + memoryMetrics = memoryMetrics, + blacklistedInStages = binary.getBlacklistedInStagesList.asScala.map(_.toInt).toSet, + peakMemoryMetrics = peakMemoryMetrics, + attributes = binary.getAttributesMap.asScala.toMap, + resources = binary.getResourcesMap.asScala.mapValues(deserializeResourceInformation).toMap, + resourceProfileId = binary.getResourceProfileId, + isExcluded = binary.getIsExcluded, + excludedInStages = binary.getExcludedInStagesList.asScala.map(_.toInt).toSet) + } + + private def serializeMemoryMetrics(metrics: MemoryMetrics): StoreTypes.MemoryMetrics = { + val builder = StoreTypes.MemoryMetrics.newBuilder() + builder.setUsedOnHeapStorageMemory(metrics.usedOnHeapStorageMemory) + builder.setUsedOffHeapStorageMemory(metrics.usedOffHeapStorageMemory) + builder.setTotalOnHeapStorageMemory(metrics.totalOnHeapStorageMemory) + builder.setTotalOffHeapStorageMemory(metrics.totalOffHeapStorageMemory) + builder.build() + } + + private def deserializeMemoryMetrics(binary: StoreTypes.MemoryMetrics): MemoryMetrics = { + new MemoryMetrics( + usedOnHeapStorageMemory = binary.getUsedOnHeapStorageMemory, + usedOffHeapStorageMemory = binary.getUsedOffHeapStorageMemory, + totalOnHeapStorageMemory = binary.getTotalOnHeapStorageMemory, + totalOffHeapStorageMemory = binary.getTotalOffHeapStorageMemory + ) + } + + private def serializeResourceInformation(info: ResourceInformation): + StoreTypes.ResourceInformation = { + val builder = StoreTypes.ResourceInformation.newBuilder() + builder.setName(info.name) + info.addresses.foreach(builder.addAddresses) + builder.build() + } + + private def deserializeResourceInformation(binary: StoreTypes.ResourceInformation): + ResourceInformation = { + new ResourceInformation( + name = binary.getName, + addresses = binary.getAddressesList.asScala.toArray) + } +} diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index 14f615587b1..22ece38ca0b 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -22,7 +22,7 @@ import java.util.Date import org.apache.spark.{JobExecutionStatus, SparkFunSuite} import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType -import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest} +import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest} import org.apache.spark.status._ import org.apache.spark.status.api.v1._ @@ -617,4 +617,133 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(result.info.numFailedTasks == input.info.numFailedTasks) assert(result.info.numKilledTasks == input.info.numKilledTasks) } + + test("Executor Summary") { + val memoryMetrics = + Some(new MemoryMetrics( + usedOnHeapStorageMemory = 15, + usedOffHeapStorageMemory = 16, + totalOnHeapStorageMemory = 17, + totalOffHeapStorageMemory = 18)) + val peakMemoryMetric = + Some(new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 1024L))) + val resources = + Map("resource1" -> new ResourceInformation("re1", Array("add1", "add2"))) + val input = new ExecutorSummaryWrapper( + info = new ExecutorSummary( + id = "id_1", + hostPort = "localhost:7777", + isActive = true, + rddBlocks = 1, + memoryUsed = 64, + diskUsed = 128, + totalCores = 2, + maxTasks = 6, + activeTasks = 5, + failedTasks = 4, + completedTasks = 3, + totalTasks = 7, + totalDuration = 8, + totalGCTime = 9, + totalInputBytes = 10, + totalShuffleRead = 11, + totalShuffleWrite = 12, + isBlacklisted = false, + maxMemory = 256, + addTime = new Date(13), + removeTime = Some(new Date(14)), + removeReason = Some("reason_1"), + executorLogs = Map("log1" -> "logs/log1.log", "log2" -> "/log/log2.log"), + memoryMetrics = memoryMetrics, + blacklistedInStages = Set(19, 20, 21), + peakMemoryMetrics = peakMemoryMetric, + attributes = Map("attri1" -> "value1", "attri2" -> "val2"), + resources = resources, + resourceProfileId = 22, + isExcluded = true, + excludedInStages = Set(23, 24) + ) + ) + + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[ExecutorSummaryWrapper]) + + assert(result.info.id == input.info.id) + assert(result.info.hostPort == input.info.hostPort) + assert(result.info.isActive == input.info.isActive) + assert(result.info.rddBlocks == input.info.rddBlocks) + assert(result.info.memoryUsed == input.info.memoryUsed) + assert(result.info.diskUsed == input.info.diskUsed) + assert(result.info.totalCores == input.info.totalCores) + assert(result.info.maxTasks == input.info.maxTasks) + assert(result.info.activeTasks == input.info.activeTasks) + assert(result.info.failedTasks == input.info.failedTasks) + assert(result.info.completedTasks == input.info.completedTasks) + assert(result.info.totalTasks == input.info.totalTasks) + assert(result.info.totalDuration == input.info.totalDuration) + assert(result.info.totalGCTime == input.info.totalGCTime) + assert(result.info.totalInputBytes == input.info.totalInputBytes) + assert(result.info.totalShuffleRead == input.info.totalShuffleRead) + assert(result.info.totalShuffleWrite == input.info.totalShuffleWrite) + assert(result.info.isBlacklisted == input.info.isBlacklisted) + assert(result.info.maxMemory == input.info.maxMemory) + assert(result.info.addTime == input.info.addTime) + assert(result.info.removeTime == input.info.removeTime) + assert(result.info.removeReason == input.info.removeReason) + + assert(result.info.executorLogs.size == input.info.executorLogs.size) + result.info.executorLogs.keys.foreach { k => + assert(input.info.executorLogs.contains(k)) + assert(result.info.executorLogs(k) == input.info.executorLogs(k)) + } + + assert(result.info.memoryMetrics.isDefined == input.info.memoryMetrics.isDefined) + if (result.info.memoryMetrics.isDefined && input.info.memoryMetrics.isDefined) { + assert(result.info.memoryMetrics.get.usedOnHeapStorageMemory == + input.info.memoryMetrics.get.usedOnHeapStorageMemory) + assert(result.info.memoryMetrics.get.usedOffHeapStorageMemory == + input.info.memoryMetrics.get.usedOffHeapStorageMemory) + assert(result.info.memoryMetrics.get.totalOnHeapStorageMemory == + input.info.memoryMetrics.get.totalOnHeapStorageMemory) + assert(result.info.memoryMetrics.get.totalOffHeapStorageMemory == + input.info.memoryMetrics.get.totalOffHeapStorageMemory) + } + + assert(result.info.blacklistedInStages.size == input.info.blacklistedInStages.size) + result.info.blacklistedInStages.foreach { stage => + assert(input.info.blacklistedInStages.contains(stage)) + } + + assert(result.info.peakMemoryMetrics.isDefined == input.info.peakMemoryMetrics.isDefined) + if (result.info.peakMemoryMetrics.isDefined && input.info.peakMemoryMetrics.isDefined) { + ExecutorMetricType.metricToOffset.foreach { case (name, index) => + result.info.peakMemoryMetrics.get.getMetricValue(name) == + input.info.peakMemoryMetrics.get.getMetricValue(name) + } + } + + assert(result.info.attributes.size == input.info.attributes.size) + result.info.attributes.keys.foreach { k => + assert(input.info.attributes.contains(k)) + assert(result.info.attributes(k) == input.info.attributes(k)) + } + + assert(result.info.resources.size == input.info.resources.size) + result.info.resources.keys.foreach { k => + assert(input.info.resources.contains(k)) + assert(result.info.resources(k).name == input.info.resources(k).name) + result.info.resources(k).addresses.zip(input.info.resources(k).addresses).foreach { + case (a1, a2) => + assert(a1 == a2) + } + } + + assert(result.info.resourceProfileId == input.info.resourceProfileId) + assert(result.info.isExcluded == input.info.isExcluded) + + assert(result.info.excludedInStages.size == input.info.excludedInStages.size) + result.info.excludedInStages.foreach { stage => + assert(input.info.excludedInStages.contains(stage)) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org