Repository: spark
Updated Branches:
  refs/heads/master dcfaeadea -> b33d6b728


[SPARK-15019][SQL] Propagate all Spark Confs to HiveConf created in 
HiveClientImpl

## What changes were proposed in this pull request?
This PR makes two changes:
1. We will propagate Spark Confs to HiveConf created in HiveClientImpl. So, 
users can also use spark conf to set warehouse location and metastore url.
2. In sql/hive, HiveClientImpl will be the only place where we create a new 
HiveConf.

## How was this patch tested?
Existing tests.

Author: Yin Huai <yh...@databricks.com>

Closes #12791 from yhuai/onlyUseHiveConfInHiveClientImpl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b33d6b72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b33d6b72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b33d6b72

Branch: refs/heads/master
Commit: b33d6b72886db35ea042e29a8c08cd73bf9d4b0c
Parents: dcfaead
Author: Yin Huai <yh...@databricks.com>
Authored: Fri Apr 29 17:07:15 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Apr 29 17:07:15 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveUtils.scala   | 22 +++------
 .../org/apache/spark/sql/hive/TableReader.scala |  4 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  | 49 ++++++++++++++------
 .../sql/hive/execution/HiveTableScanExec.scala  |  2 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  8 ++--
 .../sql/hive/InsertIntoHiveTableSuite.scala     |  3 +-
 .../hive/ParquetHiveCompatibilitySuite.scala    |  4 +-
 7 files changed, 52 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index a856119..be89edb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -178,7 +178,7 @@ private[spark] object HiveUtils extends Logging {
   /**
    * Configurations needed to create a [[HiveClient]].
    */
-  private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, 
String] = {
+  private[hive] def hiveClientConfigurations(hadoopConf: Configuration): 
Map[String, String] = {
     // Hive 0.14.0 introduces timeout operations in HiveConf, and changes 
default values of a bunch
     // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.).  
This breaks backwards-
     // compatibility when users are trying to connecting to a Hive metastore 
of lower version,
@@ -227,7 +227,7 @@ private[spark] object HiveUtils extends Logging {
       ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
       ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
     ).map { case (confVar, unit) =>
-      confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
+      confVar.varname -> HiveConf.getTimeVar(hadoopConf, confVar, 
unit).toString
     }.toMap
   }
 
@@ -264,14 +264,12 @@ private[spark] object HiveUtils extends Logging {
   protected[hive] def newClientForMetadata(
       conf: SparkConf,
       hadoopConf: Configuration): HiveClient = {
-    val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
-    val configurations = hiveClientConfigurations(hiveConf)
-    newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
+    val configurations = hiveClientConfigurations(hadoopConf)
+    newClientForMetadata(conf, hadoopConf, configurations)
   }
 
   protected[hive] def newClientForMetadata(
       conf: SparkConf,
-      hiveConf: HiveConf,
       hadoopConf: Configuration,
       configurations: Map[String, String]): HiveClient = {
     val sqlConf = new SQLConf
@@ -282,12 +280,6 @@ private[spark] object HiveUtils extends Logging {
     val hiveMetastoreBarrierPrefixes = 
HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
     val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
 
-    val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
-    logInfo("default warehouse location is " + defaultWarehouseLocation)
-
-    // `configure` goes second to override other settings.
-    val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ 
configurations
-
     val isolatedLoader = if (hiveMetastoreJars == "builtin") {
       if (hiveExecutionVersion != hiveMetastoreVersion) {
         throw new IllegalArgumentException(
@@ -321,7 +313,7 @@ private[spark] object HiveUtils extends Logging {
         sparkConf = conf,
         hadoopConf = hadoopConf,
         execJars = jars.toSeq,
-        config = allConfig,
+        config = configurations,
         isolationOn = true,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
@@ -334,7 +326,7 @@ private[spark] object HiveUtils extends Logging {
         hadoopVersion = VersionInfo.getVersion,
         sparkConf = conf,
         hadoopConf = hadoopConf,
-        config = allConfig,
+        config = configurations,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
     } else {
@@ -364,7 +356,7 @@ private[spark] object HiveUtils extends Logging {
         sparkConf = conf,
         hadoopConf = hadoopConf,
         execJars = jars.toSeq,
-        config = allConfig,
+        config = configurations,
         isolationOn = true,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)

http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index df6abc2..d044811 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -105,7 +105,7 @@ class HadoopTableReader(
     // Create local references to member variables, so that the entire `this` 
object won't be
     // serialized in the closure below.
     val tableDesc = relation.tableDesc
-    val broadcastedHiveConf = _broadcastedHadoopConf
+    val broadcastedHadoopConf = _broadcastedHadoopConf
 
     val tablePath = hiveTable.getPath
     val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
@@ -119,7 +119,7 @@ class HadoopTableReader(
     val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
 
     val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
-      val hconf = broadcastedHiveConf.value.value
+      val hconf = broadcastedHadoopConf.value.value
       val deserializer = deserializerClass.newInstance()
       deserializer.initialize(hconf, tableDesc.getProperties)
       HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, 
mutableRow, deserializer)

http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c98eaa0..78ba2bf 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -56,9 +56,17 @@ import org.apache.spark.util.{CircularBuffer, Utils}
  * the 'native', execution version of Hive.  Therefore, any places where hive 
breaks compatibility
  * must use reflection after matching on `version`.
  *
+ * Every HiveClientImpl creates an internal HiveConf object. This object is 
using the given
+ * `hadoopConf` as the base. All options set in the `sparkConf` will be 
applied to the HiveConf
+ * object and overrides any exiting options. Then, options in extraConfig will 
be applied
+ * to the HiveConf object and overrides any existing options.
+ *
  * @param version the version of hive used when pick function calls that are 
not compatible.
- * @param config  a collection of configuration options that will be added to 
the hive conf before
- *                opening the hive client.
+ * @param sparkConf all configuration options set in SparkConf.
+ * @param hadoopConf the base Configuration object used by the HiveConf 
created inside
+ *                   this HiveClientImpl.
+ * @param extraConfig a collection of configuration options that will be added 
to the
+ *                hive conf before opening the hive client.
  * @param initClassLoader the classloader used when creating the `state` field 
of
  *                        this [[HiveClientImpl]].
  */
@@ -66,7 +74,7 @@ private[hive] class HiveClientImpl(
     override val version: HiveVersion,
     sparkConf: SparkConf,
     hadoopConf: Configuration,
-    config: Map[String, String],
+    extraConfig: Map[String, String],
     initClassLoader: ClassLoader,
     val clientLoader: IsolatedClientLoader)
   extends HiveClient
@@ -129,22 +137,32 @@ private[hive] class HiveClientImpl(
         // so we should keep `conf` and reuse the existing instance of 
`CliSessionState`.
         originalState
       } else {
-        val initialConf = new HiveConf(hadoopConf, classOf[SessionState])
+        val hiveConf = new HiveConf(hadoopConf, classOf[SessionState])
         // HiveConf is a Hadoop Configuration, which has a field of 
classLoader and
         // the initial value will be the current thread's context class loader
         // (i.e. initClassLoader at here).
         // We call initialConf.setClassLoader(initClassLoader) at here to make
         // this action explicit.
-        initialConf.setClassLoader(initClassLoader)
-        config.foreach { case (k, v) =>
+        hiveConf.setClassLoader(initClassLoader)
+        // First, we set all spark confs to this hiveConf.
+        sparkConf.getAll.foreach { case (k, v) =>
+          if (k.toLowerCase.contains("password")) {
+            logDebug(s"Applying Spark config to Hive Conf: $k=xxx")
+          } else {
+            logDebug(s"Applying Spark config to Hive Conf: $k=$v")
+          }
+          hiveConf.set(k, v)
+        }
+        // Second, we set all entries in config to this hiveConf.
+        extraConfig.foreach { case (k, v) =>
           if (k.toLowerCase.contains("password")) {
-            logDebug(s"Hive Config: $k=xxx")
+            logDebug(s"Applying extra config to HiveConf: $k=xxx")
           } else {
-            logDebug(s"Hive Config: $k=$v")
+            logDebug(s"Applying extra config to HiveConf: $k=$v")
           }
-          initialConf.set(k, v)
+          hiveConf.set(k, v)
         }
-        val state = new SessionState(initialConf)
+        val state = new SessionState(hiveConf)
         if (clientLoader.cachedHive != null) {
           Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
         }
@@ -159,10 +177,13 @@ private[hive] class HiveClientImpl(
     ret
   }
 
+  // Log the default warehouse location.
+  logInfo(
+    s"Default warehouse location for Hive client " +
+      s"(version ${version.fullVersion}) is 
${conf.get("hive.metastore.warehouse.dir")}")
+
   /** Returns the configuration for the current session. */
-  // TODO: We should not use it because HiveSessionState has a hiveconf
-  // for the current Session.
-  def conf: HiveConf = SessionState.get().getConf
+  def conf: HiveConf = state.getConf
 
   override def getConf(key: String, defaultValue: String): String = {
     conf.get(key, defaultValue)
@@ -212,7 +233,7 @@ private[hive] class HiveClientImpl(
     false
   }
 
-  def client: Hive = {
+  private def client: Hive = {
     if (clientLoader.cachedHive != null) {
       clientLoader.cachedHive.asInstanceOf[Hive]
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index b52b96a..e29864f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -73,7 +73,7 @@ case class HiveTableScanExec(
     BindReferences.bindReference(pred, relation.partitionKeys)
   }
 
-  // Create a local copy of hiveconf,so that scan specific modifications 
should not impact
+  // Create a local copy of hadoopConf,so that scan specific modifications 
should not impact
   // other queries
   @transient
   private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index e763b63..93646a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -561,23 +561,21 @@ private[hive] object TestHiveContext {
       warehousePath: File,
       scratchDirPath: File,
       metastoreTemporaryConf: Map[String, String]): HiveClient = {
-    val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
     HiveUtils.newClientForMetadata(
       conf,
-      hiveConf,
       hadoopConf,
-      hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath, 
metastoreTemporaryConf))
+      hiveClientConfigurations(hadoopConf, warehousePath, scratchDirPath, 
metastoreTemporaryConf))
   }
 
   /**
    * Configurations needed to create a [[HiveClient]].
    */
   def hiveClientConfigurations(
-      hiveconf: HiveConf,
+      hadoopConf: Configuration,
       warehousePath: File,
       scratchDirPath: File,
       metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
-    HiveUtils.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ 
Map(
+    HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf 
++ Map(
       ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
       ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
       ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 46de492..baf34d1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -111,7 +111,8 @@ class InsertIntoHiveTableSuite extends QueryTest with 
TestHiveSingleton with Bef
   test("SPARK-4203:random partition directory order") {
     sql("CREATE TABLE tmp_table (key int, value string)")
     val tmpDir = Utils.createTempDir()
-    val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
+    // The default value of hive.exec.stagingdir.
+    val stagingDir = ".hive-staging"
 
     sql(
       s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index d789145..af4dc1b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.internal.SQLConf
 class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with 
TestHiveSingleton {
   /**
    * Set the staging directory (and hence path to ignore Parquet files under)
-   * to that set by [[HiveConf.ConfVars.STAGINGDIR]].
+   * to the default value of hive.exec.stagingdir.
    */
-  private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
+  private val stagingDir = ".hive-staging"
 
   override protected def logParquetSchema(path: String): Unit = {
     val schema = readParquetSchema(path, { path =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to