This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ec509b49dcaa [SPARK-47586][SQL] Hive module: Migrate logError with variables to structured logging framework ec509b49dcaa is described below commit ec509b49dcaa21d6dcdf18c1b40ac9d6df1827d7 Author: Haejoon Lee <haejoon....@databricks.com> AuthorDate: Tue Apr 9 18:22:40 2024 -0700 [SPARK-47586][SQL] Hive module: Migrate logError with variables to structured logging framework ### What changes were proposed in this pull request? This PR proposes to migrate `logError` with variables of Hive module to structured logging framework. ### Why are the changes needed? To improve the existing logging system by migrating into structured logging. ### Does this PR introduce _any_ user-facing change? No API changes, but the SQL catalyst logs will contain MDC(Mapped Diagnostic Context) from now. ### How was this patch tested? Run Scala auto formatting and style check. Also the existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45876 from itholic/hive-logerror. Lead-authored-by: Haejoon Lee <haejoon....@databricks.com> Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../main/scala/org/apache/spark/internal/LogKey.scala | 6 ++++++ .../scala/org/apache/spark/sql/hive/TableReader.scala | 8 ++++++-- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 17 ++++++++++------- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 7fa0331515cb..2cb5eac4548c 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -51,7 +51,9 @@ object LogKey extends Enumeration { val CSV_SCHEMA_FIELD_NAME = Value val CSV_SCHEMA_FIELD_NAMES = Value val CSV_SOURCE = Value + val DATABASE_NAME = Value val DRIVER_ID = Value + val DROPPED_PARTITIONS = Value val END_POINT = Value val ERROR = Value val EVENT_LOOP = Value @@ -61,6 +63,7 @@ object LogKey extends Enumeration { val EXIT_CODE = Value val EXPRESSION_TERMS = Value val FAILURES = Value + val FIELD_NAME = Value val FUNCTION_NAME = Value val FUNCTION_PARAMETER = Value val GROUP_ID = Value @@ -92,6 +95,7 @@ object LogKey extends Enumeration { val PARSE_MODE = Value val PARTITION_ID = Value val PARTITION_SPECIFICATION = Value + val PARTITION_SPECS = Value val PATH = Value val PATHS = Value val POD_ID = Value @@ -105,6 +109,7 @@ object LogKey extends Enumeration { val REASON = Value val REDUCE_ID = Value val RELATION_NAME = Value + val REMAINING_PARTITIONS = Value val REMOTE_ADDRESS = Value val RETRY_COUNT = Value val RETRY_INTERVAL = Value @@ -124,6 +129,7 @@ object LogKey extends Enumeration { val STATEMENT_ID = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value + val TABLE_NAME = Value val TASK_ATTEMPT_ID = Value val TASK_ID = Value val TASK_NAME = Value diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index d72406f094a6..60970eecc2df 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -36,7 +36,8 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => oldInputClass, import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} @@ -518,7 +519,10 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { i += 1 } catch { case ex: Throwable => - logError(s"Exception thrown in field <${fieldRefs(i).getFieldName}>") + logError( + log"Exception thrown in field <${MDC(FIELD_NAME, fieldRefs(i).getFieldName)}>", + ex + ) throw ex } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 46dc56372334..92561bed1195 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -46,7 +46,8 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException} @@ -686,17 +687,19 @@ private[hive] class HiveClientImpl( } catch { case e: Exception => val remainingParts = matchingParts.toBuffer --= droppedParts + // scalastyle:off line.size.limit logError( - s""" + log""" |====================== - |Attempt to drop the partition specs in table '$table' database '$db': - |${specs.mkString("\n")} + |Attempt to drop the partition specs in table '${MDC(TABLE_NAME, table)}' database '${MDC(DATABASE_NAME, db)}': + |${MDC(PARTITION_SPECS, specs.mkString("\n"))} |In this attempt, the following partitions have been dropped successfully: - |${droppedParts.mkString("\n")} + |${MDC(DROPPED_PARTITIONS, droppedParts.mkString("\n"))} |The remaining partitions have not been dropped: - |${remainingParts.mkString("\n")} + |${MDC(REMAINING_PARTITIONS, remainingParts.mkString("\n"))} |====================== """.stripMargin) + // scalastyle:on line.size.limit throw e } droppedParts += partition @@ -910,7 +913,7 @@ private[hive] class HiveClientImpl( |====================== |END HIVE FAILURE OUTPUT |====================== - """.stripMargin) + """.stripMargin, e) throw e } finally { if (state != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org