This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5d61626 [SPARK-31066][SQL][TEST-HIVE1.2] Disable useless and uncleaned hive SessionState initialization parts 5d61626 is described below commit 5d616262954f0b163e74c7a5527e5b3dfa913f52 Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Thu Mar 12 18:13:52 2020 +0800 [SPARK-31066][SQL][TEST-HIVE1.2] Disable useless and uncleaned hive SessionState initialization parts ### What changes were proposed in this pull request? As a common usage and according to the spark doc, users may often just copy their `hive-site.xml` to Spark directly from hive projects. Sometimes, the config file is not that clean for spark and may cause some side effects. for example, `hive.session.history.enabled` will create a log for the hive jobs but useless for spark and also it will not be deleted on JVM exit. this pr 1) disable `hive.session.history.enabled` explicitly to disable creating `hive_job_log` file, e.g. ``` Hive history file=/var/folders/01/h81cs4sn3dq2dd_k4j6fhrmc0000gn/T//kentyao/hive_job_log_79c63b29-95a4-4935-a9eb-2d89844dfe4f_493861201.txt ``` 2) set `hive.execution.engine` to `spark` explicitly in case the config is `tez` and casue uneccesary problem like this: ``` Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/tez/dag/api/SessionNotRunning at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:529) ``` ### Why are the changes needed? reduce overhead of internal complexity and users' hive cognitive load for running spark ### Does this PR introduce any user-facing change? yes, `hive_job_log` file will not be created even enabled, and will not try to initialize tez kinds of stuff ### How was this patch tested? add ut and verify manually Closes #27827 from yaooqinn/SPARK-31066. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 18f27308749e695f9942768d7ba85cef9fceb174) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 8 +-- .../spark/sql/hive/client/HiveClientImpl.scala | 82 +++++++++++++--------- .../spark/sql/hive/client/VersionsSuite.scala | 12 ++++ 3 files changed, 64 insertions(+), 38 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 19f7ea8..6b76927 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -44,6 +44,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider import org.apache.spark.util.ShutdownHookManager @@ -88,12 +89,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf) - val cliConf = new HiveConf(classOf[SessionState]) - (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) - ++ sparkConf.getAll.toMap ++ extraConfigs).foreach { - case (k, v) => - cliConf.set(k, v) - } + val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs) val sessionState = new CliSessionState(cliConf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index f2c516e..4a3e813 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -58,7 +58,6 @@ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -99,6 +98,8 @@ private[hive] class HiveClientImpl( extends HiveClient with Logging { + import HiveClientImpl._ + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() @@ -159,36 +160,7 @@ private[hive] class HiveClientImpl( s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}") private def newState(): SessionState = { - val hiveConf = new HiveConf(classOf[SessionState]) - // HiveConf is a Hadoop Configuration, which has a field of classLoader and - // the initial value will be the current thread's context class loader - // (i.e. initClassLoader at here). - // We call hiveConf.setClassLoader(initClassLoader) at here to make - // this action explicit. - hiveConf.setClassLoader(initClassLoader) - - // 1: Take all from the hadoopConf to this hiveConf. - // This hadoopConf contains user settings in Hadoop's core-site.xml file - // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in - // SharedState and put settings in this hadoopConf instead of relying on HiveConf - // to load user settings. Otherwise, HiveConf's initialize method will override - // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars - // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath - // has hive-site.xml. So, HiveConf will use that to override its default values. - // 2: we set all spark confs to this hiveConf. - // 3: we set all entries in config to this hiveConf. - val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++ - sparkConf.getAll.toMap ++ extraConfig).toMap - confMap.foreach { case (k, v) => hiveConf.set(k, v) } - SQLConf.get.redactOptions(confMap).foreach { case (k, v) => - logDebug( - s""" - |Applying Hadoop/Hive/Spark and extra properties to Hive Conf: - |$k=$v - """.stripMargin) - } - // Disable CBO because we removed the Calcite dependency. - hiveConf.setBoolean("hive.cbo.enable", false) + val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader)) val state = new SessionState(hiveConf) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) @@ -990,7 +962,7 @@ private[hive] class HiveClientImpl( } } -private[hive] object HiveClientImpl { +private[hive] object HiveClientImpl extends Logging { /** Converts the native StructField to Hive's FieldSchema. */ def toHiveColumn(c: StructField): FieldSchema = { val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { @@ -1232,4 +1204,50 @@ private[hive] object HiveClientImpl { StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE ) + + def newHiveConf( + sparkConf: SparkConf, + hadoopConf: JIterable[JMap.Entry[String, String]], + extraConfig: Map[String, String], + classLoader: Option[ClassLoader] = None): HiveConf = { + val hiveConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader. + // We call hiveConf.setClassLoader(initClassLoader) at here to ensure it use the classloader + // we want. + classLoader.foreach(hiveConf.setClassLoader) + // 1: Take all from the hadoopConf to this hiveConf. + // This hadoopConf contains user settings in Hadoop's core-site.xml file + // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in + // SharedState and put settings in this hadoopConf instead of relying on HiveConf + // to load user settings. Otherwise, HiveConf's initialize method will override + // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars + // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath + // has hive-site.xml. So, HiveConf will use that to override its default values. + // 2: we set all spark confs to this hiveConf. + // 3: we set all entries in config to this hiveConf. + val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++ + sparkConf.getAll.toMap ++ extraConfig).toMap + confMap.foreach { case (k, v) => hiveConf.set(k, v) } + SQLConf.get.redactOptions(confMap).foreach { case (k, v) => + logDebug(s"Applying Hadoop/Hive/Spark and extra properties to Hive Conf:$k=$v") + } + // Disable CBO because we removed the Calcite dependency. + hiveConf.setBoolean("hive.cbo.enable", false) + // If this is true, SessionState.start will create a file to log hive job which will not be + // deleted on exit and is useless for spark + if (hiveConf.getBoolean("hive.session.history.enabled", false)) { + logWarning("Detected HiveConf hive.session.history.enabled is true and will be reset to" + + " false to disable useless hive logic") + hiveConf.setBoolean("hive.session.history.enabled", false) + } + // If this is tez engine, SessionState.start might bring extra logic to initialize tez stuff, + // which is useless for spark. + if (hiveConf.get("hive.execution.engine") == "tez") { + logWarning("Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr'" + + " to disable useless hive logic") + hiveConf.set("hive.execution.engine", "mr") + } + hiveConf + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 4760af7..7471142 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -82,6 +82,18 @@ class VersionsSuite extends SparkFunSuite with Logging { assert("success" === client.getConf("test", null)) } + test("override useless and side-effect hive configurations ") { + val hadoopConf = new Configuration() + // These hive flags should be reset by spark + hadoopConf.setBoolean("hive.cbo.enable", true) + hadoopConf.setBoolean("hive.session.history.enabled", true) + hadoopConf.set("hive.execution.engine", "tez") + val client = buildClient(HiveUtils.builtinHiveVersion, hadoopConf) + assert(!client.getConf("hive.cbo.enable", "true").toBoolean) + assert(!client.getConf("hive.session.history.enabled", "true").toBoolean) + assert(client.getConf("hive.execution.engine", "tez") === "mr") + } + private def getNestedMessages(e: Throwable): String = { var causes = "" var lastException = e --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org