Repository: spark Updated Branches: refs/heads/master dcfaeadea -> b33d6b728
[SPARK-15019][SQL] Propagate all Spark Confs to HiveConf created in HiveClientImpl ## What changes were proposed in this pull request? This PR makes two changes: 1. We will propagate Spark Confs to HiveConf created in HiveClientImpl. So, users can also use spark conf to set warehouse location and metastore url. 2. In sql/hive, HiveClientImpl will be the only place where we create a new HiveConf. ## How was this patch tested? Existing tests. Author: Yin Huai <yh...@databricks.com> Closes #12791 from yhuai/onlyUseHiveConfInHiveClientImpl. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b33d6b72 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b33d6b72 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b33d6b72 Branch: refs/heads/master Commit: b33d6b72886db35ea042e29a8c08cd73bf9d4b0c Parents: dcfaead Author: Yin Huai <yh...@databricks.com> Authored: Fri Apr 29 17:07:15 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Fri Apr 29 17:07:15 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/hive/HiveUtils.scala | 22 +++------ .../org/apache/spark/sql/hive/TableReader.scala | 4 +- .../spark/sql/hive/client/HiveClientImpl.scala | 49 ++++++++++++++------ .../sql/hive/execution/HiveTableScanExec.scala | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 8 ++-- .../sql/hive/InsertIntoHiveTableSuite.scala | 3 +- .../hive/ParquetHiveCompatibilitySuite.scala | 4 +- 7 files changed, 52 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a856119..be89edb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -178,7 +178,7 @@ private[spark] object HiveUtils extends Logging { /** * Configurations needed to create a [[HiveClient]]. */ - private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = { + private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- // compatibility when users are trying to connecting to a Hive metastore of lower version, @@ -227,7 +227,7 @@ private[spark] object HiveUtils extends Logging { ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS, ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS ).map { case (confVar, unit) => - confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString + confVar.varname -> HiveConf.getTimeVar(hadoopConf, confVar, unit).toString }.toMap } @@ -264,14 +264,12 @@ private[spark] object HiveUtils extends Logging { protected[hive] def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration): HiveClient = { - val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) - val configurations = hiveClientConfigurations(hiveConf) - newClientForMetadata(conf, hiveConf, hadoopConf, configurations) + val configurations = hiveClientConfigurations(hadoopConf) + newClientForMetadata(conf, hadoopConf, configurations) } protected[hive] def newClientForMetadata( conf: SparkConf, - hiveConf: HiveConf, hadoopConf: Configuration, configurations: Map[String, String]): HiveClient = { val sqlConf = new SQLConf @@ -282,12 +280,6 @@ private[spark] object HiveUtils extends Logging { val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) - val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir") - logInfo("default warehouse location is " + defaultWarehouseLocation) - - // `configure` goes second to override other settings. - val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations - val isolatedLoader = if (hiveMetastoreJars == "builtin") { if (hiveExecutionVersion != hiveMetastoreVersion) { throw new IllegalArgumentException( @@ -321,7 +313,7 @@ private[spark] object HiveUtils extends Logging { sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, - config = allConfig, + config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) @@ -334,7 +326,7 @@ private[spark] object HiveUtils extends Logging { hadoopVersion = VersionInfo.getVersion, sparkConf = conf, hadoopConf = hadoopConf, - config = allConfig, + config = configurations, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else { @@ -364,7 +356,7 @@ private[spark] object HiveUtils extends Logging { sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, - config = allConfig, + config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index df6abc2..d044811 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -105,7 +105,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHadoopConf + val broadcastedHadoopConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) @@ -119,7 +119,7 @@ class HadoopTableReader( val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value + val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c98eaa0..78ba2bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -56,9 +56,17 @@ import org.apache.spark.util.{CircularBuffer, Utils} * the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility * must use reflection after matching on `version`. * + * Every HiveClientImpl creates an internal HiveConf object. This object is using the given + * `hadoopConf` as the base. All options set in the `sparkConf` will be applied to the HiveConf + * object and overrides any exiting options. Then, options in extraConfig will be applied + * to the HiveConf object and overrides any existing options. + * * @param version the version of hive used when pick function calls that are not compatible. - * @param config a collection of configuration options that will be added to the hive conf before - * opening the hive client. + * @param sparkConf all configuration options set in SparkConf. + * @param hadoopConf the base Configuration object used by the HiveConf created inside + * this HiveClientImpl. + * @param extraConfig a collection of configuration options that will be added to the + * hive conf before opening the hive client. * @param initClassLoader the classloader used when creating the `state` field of * this [[HiveClientImpl]]. */ @@ -66,7 +74,7 @@ private[hive] class HiveClientImpl( override val version: HiveVersion, sparkConf: SparkConf, hadoopConf: Configuration, - config: Map[String, String], + extraConfig: Map[String, String], initClassLoader: ClassLoader, val clientLoader: IsolatedClientLoader) extends HiveClient @@ -129,22 +137,32 @@ private[hive] class HiveClientImpl( // so we should keep `conf` and reuse the existing instance of `CliSessionState`. originalState } else { - val initialConf = new HiveConf(hadoopConf, classOf[SessionState]) + val hiveConf = new HiveConf(hadoopConf, classOf[SessionState]) // HiveConf is a Hadoop Configuration, which has a field of classLoader and // the initial value will be the current thread's context class loader // (i.e. initClassLoader at here). // We call initialConf.setClassLoader(initClassLoader) at here to make // this action explicit. - initialConf.setClassLoader(initClassLoader) - config.foreach { case (k, v) => + hiveConf.setClassLoader(initClassLoader) + // First, we set all spark confs to this hiveConf. + sparkConf.getAll.foreach { case (k, v) => + if (k.toLowerCase.contains("password")) { + logDebug(s"Applying Spark config to Hive Conf: $k=xxx") + } else { + logDebug(s"Applying Spark config to Hive Conf: $k=$v") + } + hiveConf.set(k, v) + } + // Second, we set all entries in config to this hiveConf. + extraConfig.foreach { case (k, v) => if (k.toLowerCase.contains("password")) { - logDebug(s"Hive Config: $k=xxx") + logDebug(s"Applying extra config to HiveConf: $k=xxx") } else { - logDebug(s"Hive Config: $k=$v") + logDebug(s"Applying extra config to HiveConf: $k=$v") } - initialConf.set(k, v) + hiveConf.set(k, v) } - val state = new SessionState(initialConf) + val state = new SessionState(hiveConf) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } @@ -159,10 +177,13 @@ private[hive] class HiveClientImpl( ret } + // Log the default warehouse location. + logInfo( + s"Default warehouse location for Hive client " + + s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") + /** Returns the configuration for the current session. */ - // TODO: We should not use it because HiveSessionState has a hiveconf - // for the current Session. - def conf: HiveConf = SessionState.get().getConf + def conf: HiveConf = state.getConf override def getConf(key: String, defaultValue: String): String = { conf.get(key, defaultValue) @@ -212,7 +233,7 @@ private[hive] class HiveClientImpl( false } - def client: Hive = { + private def client: Hive = { if (clientLoader.cachedHive != null) { clientLoader.cachedHive.asInstanceOf[Hive] } else { http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index b52b96a..e29864f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -73,7 +73,7 @@ case class HiveTableScanExec( BindReferences.bindReference(pred, relation.partitionKeys) } - // Create a local copy of hiveconf,so that scan specific modifications should not impact + // Create a local copy of hadoopConf,so that scan specific modifications should not impact // other queries @transient private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf() http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index e763b63..93646a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -561,23 +561,21 @@ private[hive] object TestHiveContext { warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): HiveClient = { - val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) HiveUtils.newClientForMetadata( conf, - hiveConf, hadoopConf, - hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath, metastoreTemporaryConf)) + hiveClientConfigurations(hadoopConf, warehousePath, scratchDirPath, metastoreTemporaryConf)) } /** * Configurations needed to create a [[HiveClient]]. */ def hiveClientConfigurations( - hiveconf: HiveConf, + hadoopConf: Configuration, warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): Map[String, String] = { - HiveUtils.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map( + HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map( ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 46de492..baf34d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -111,7 +111,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("SPARK-4203:random partition directory order") { sql("CREATE TABLE tmp_table (key int, value string)") val tmpDir = Utils.createTempDir() - val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) + // The default value of hive.exec.stagingdir. + val stagingDir = ".hive-staging" sql( s""" http://git-wip-us.apache.org/repos/asf/spark/blob/b33d6b72/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index d789145..af4dc1b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.internal.SQLConf class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton { /** * Set the staging directory (and hence path to ignore Parquet files under) - * to that set by [[HiveConf.ConfVars.STAGINGDIR]]. + * to the default value of hive.exec.stagingdir. */ - private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) + private val stagingDir = ".hive-staging" override protected def logParquetSchema(path: String): Unit = { val schema = readParquetSchema(path, { path => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org