This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 88fc48f5e7e [SPARK-41431][CORE][SQL][UI] Protobuf serializer for `SQLExecutionUIData` 88fc48f5e7e is described below commit 88fc48f5e7e907c25d082a7b35231744ccef2c7e Author: yangjie01 <yangji...@baidu.com> AuthorDate: Fri Dec 23 15:53:40 2022 -0800 [SPARK-41431][CORE][SQL][UI] Protobuf serializer for `SQLExecutionUIData` ### What changes were proposed in this pull request? Add Protobuf serializer for `SQLExecutionUIData` ### Why are the changes needed? Support fast and compact serialization/deserialization for `SQLExecutionUIData` over RocksDB. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new UT Closes #39139 from LuciferYang/SPARK-41431. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 21 +++++ sql/core/pom.xml | 5 ++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 18 +++++ .../sql/SQLExecutionUIDataSerializer.scala | 90 ++++++++++++++++++++++ .../protobuf/sql/SQLPlanMetricSerializer.scala | 36 +++++++++ .../sql/KVStoreProtobufSerializerSuite.scala | 88 +++++++++++++++++++++ 6 files changed, 258 insertions(+) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 7cf5c2921cb..cb0dea540bd 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -355,3 +355,24 @@ message ExecutorSummary { message ExecutorSummaryWrapper { ExecutorSummary info = 1; } + +message SQLPlanMetric { + string name = 1; + int64 accumulator_id = 2; + string metric_type = 3; +} + +message SQLExecutionUIData { + int64 execution_id = 1; + string description = 2; + string details = 3; + string physical_plan_description = 4; + map<string, string> modified_configs = 5; + repeated SQLPlanMetric metrics = 6; + int64 submission_time = 7; + optional int64 completion_time = 8; + optional string error_message = 9; + map<int64, JobExecutionStatus> jobs = 10; + repeated int64 stages = 11; + map<int64, string> metric_values = 12; +} diff --git a/sql/core/pom.xml b/sql/core/pom.xml index cfcf7455ad0..71c57f8a7f7 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -147,6 +147,11 @@ <groupId>org.apache.xbean</groupId> <artifactId>xbean-asm9-shaded</artifactId> </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> <dependency> <groupId>org.scalacheck</groupId> <artifactId>scalacheck_${scala.binary.version}</artifactId> diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe new file mode 100644 index 00000000000..de5f2c2d05c --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala new file mode 100644 index 00000000000..8dc28517ff0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.Date + +import collection.JavaConverters._ + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.sql.execution.ui.SQLExecutionUIData +import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes} +import org.apache.spark.status.protobuf.Utils.getOptional + +class SQLExecutionUIDataSerializer extends ProtobufSerDe { + + override val supportClass: Class[_] = classOf[SQLExecutionUIData] + + override def serialize(input: Any): Array[Byte] = { + val ui = input.asInstanceOf[SQLExecutionUIData] + val builder = StoreTypes.SQLExecutionUIData.newBuilder() + builder.setExecutionId(ui.executionId) + builder.setDescription(ui.description) + builder.setDetails(ui.details) + builder.setPhysicalPlanDescription(ui.physicalPlanDescription) + ui.modifiedConfigs.foreach { + case (k, v) => builder.putModifiedConfigs(k, v) + } + ui.metrics.foreach(m => builder.addMetrics(SQLPlanMetricSerializer.serialize(m))) + builder.setSubmissionTime(ui.submissionTime) + ui.completionTime.foreach(ct => builder.setCompletionTime(ct.getTime)) + ui.errorMessage.foreach(builder.setErrorMessage) + ui.jobs.foreach { + case (id, status) => + builder.putJobs(id.toLong, StoreTypes.JobExecutionStatus.valueOf(status.toString)) + } + ui.stages.foreach(stageId => builder.addStages(stageId.toLong)) + val metricValues = ui.metricValues + if (metricValues != null) { + metricValues.foreach { + case (k, v) => builder.putMetricValues(k, v) + } + } + builder.build().toByteArray + } + + override def deserialize(bytes: Array[Byte]): SQLExecutionUIData = { + val ui = StoreTypes.SQLExecutionUIData.parseFrom(bytes) + val completionTime = + getOptional(ui.hasCompletionTime, () => new Date(ui.getCompletionTime)) + val errorMessage = getOptional(ui.hasErrorMessage, () => ui.getErrorMessage) + val metrics = + ui.getMetricsList.asScala.map(m => SQLPlanMetricSerializer.deserialize(m)).toSeq + val jobs = ui.getJobsMap.asScala.map { + case (jobId, status) => jobId.toInt -> JobExecutionStatus.valueOf(status.toString) + }.toMap + val metricValues = ui.getMetricValuesMap.asScala.map { + case (k, v) => k.toLong -> v + }.toMap + + new SQLExecutionUIData( + executionId = ui.getExecutionId, + description = ui.getDescription, + details = ui.getDetails, + physicalPlanDescription = ui.getPhysicalPlanDescription, + modifiedConfigs = ui.getModifiedConfigsMap.asScala.toMap, + metrics = metrics, + submissionTime = ui.getSubmissionTime, + completionTime = completionTime, + errorMessage = errorMessage, + jobs = jobs, + stages = ui.getStagesList.asScala.map(_.toInt).toSet, + metricValues = metricValues + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala new file mode 100644 index 00000000000..8886bba2f92 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import org.apache.spark.sql.execution.ui.SQLPlanMetric +import org.apache.spark.status.protobuf.StoreTypes + +object SQLPlanMetricSerializer { + + def serialize(metric: SQLPlanMetric): StoreTypes.SQLPlanMetric = { + StoreTypes.SQLPlanMetric.newBuilder() + .setName(metric.name) + .setAccumulatorId(metric.accumulatorId) + .setMetricType(metric.metricType) + .build() + } + + def deserialize(metrics: StoreTypes.SQLPlanMetric): SQLPlanMetric = { + SQLPlanMetric(metrics.getName, metrics.getAccumulatorId, metrics.getMetricType) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala new file mode 100644 index 00000000000..9d6a938c3fe --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.ui.SQLExecutionUIData +import org.apache.spark.status.api.v1.sql.SqlResourceSuite +import org.apache.spark.status.protobuf.KVStoreProtobufSerializer + +class KVStoreProtobufSerializerSuite extends SparkFunSuite { + + private val serializer = new KVStoreProtobufSerializer() + + test("SQLExecutionUIData") { + val input = SqlResourceSuite.sqlExecutionUIData + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData]) + assert(result.executionId == input.executionId) + assert(result.description == input.description) + assert(result.details == input.details) + assert(result.physicalPlanDescription == input.physicalPlanDescription) + assert(result.modifiedConfigs == input.modifiedConfigs) + assert(result.metrics == input.metrics) + assert(result.submissionTime == input.submissionTime) + assert(result.completionTime == input.completionTime) + assert(result.errorMessage == input.errorMessage) + assert(result.jobs == input.jobs) + assert(result.stages == input.stages) + assert(result.metricValues == input.metricValues) + } + + test("SQLExecutionUIData with metricValues is empty map and null") { + val templateData = SqlResourceSuite.sqlExecutionUIData + + val input1 = new SQLExecutionUIData( + executionId = templateData.executionId, + description = templateData.description, + details = templateData.details, + physicalPlanDescription = templateData.physicalPlanDescription, + modifiedConfigs = templateData.modifiedConfigs, + metrics = templateData.metrics, + submissionTime = templateData.submissionTime, + completionTime = templateData.completionTime, + errorMessage = templateData.errorMessage, + jobs = templateData.jobs, + stages = templateData.stages, + metricValues = Map.empty + ) + val bytes1 = serializer.serialize(input1) + val result1 = serializer.deserialize(bytes1, classOf[SQLExecutionUIData]) + // input.metricValues is empty map, result.metricValues is empty map. + assert(result1.metricValues.isEmpty) + + val input2 = new SQLExecutionUIData( + executionId = templateData.executionId, + description = templateData.description, + details = templateData.details, + physicalPlanDescription = templateData.physicalPlanDescription, + modifiedConfigs = templateData.modifiedConfigs, + metrics = templateData.metrics, + submissionTime = templateData.submissionTime, + completionTime = templateData.completionTime, + errorMessage = templateData.errorMessage, + jobs = templateData.jobs, + stages = templateData.stages, + metricValues = null + ) + val bytes2 = serializer.serialize(input2) + val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData]) + // input.metricValues is null, result.metricValues is also empty map. + assert(result2.metricValues.isEmpty) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org