dongjoon-hyun commented on code in PR #38084:
URL: https://github.com/apache/spark/pull/38084#discussion_r992816546
##########
core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala:
##########
@@ -474,40 +559,61 @@ private[spark] object SparkHadoopUtil extends Logging {
private def appendHiveConfigs(hadoopConf: Configuration): Unit = {
hiveConfKeys.foreach { kv =>
- hadoopConf.set(kv.getKey, kv.getValue)
+ hadoopConf.set(kv.getKey, kv.getValue, SOURCE_HIVE_SITE)
}
}
private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf:
Configuration): Unit = {
// Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
- hadoopConf.set(key.substring("spark.hadoop.".length), value)
+ hadoopConf.set(key.substring("spark.hadoop.".length), value,
+ SOURCE_SPARK_HADOOP)
}
+ val setBySpark = SET_TO_DEFAULT_VALUES
if
(conf.getOption("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version").isEmpty)
{
- hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1")
+ hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1",
setBySpark)
}
- // Since Hadoop 3.3.1, HADOOP-17597 starts to throw exceptions by default
+ // In Hadoop 3.3.1, HADOOP-17597 starts to throw exceptions by default
+ // this has been reverted in 3.3.2 (HADOOP-17928); setting it to
+ // true here is harmless
if
(conf.getOption("spark.hadoop.fs.s3a.downgrade.syncable.exceptions").isEmpty) {
- hadoopConf.set("fs.s3a.downgrade.syncable.exceptions", "true")
+ hadoopConf.set("fs.s3a.downgrade.syncable.exceptions", "true",
setBySpark)
}
// In Hadoop 3.3.1, AWS region handling with the default "" endpoint only
works
// in EC2 deployments or when the AWS CLI is installed.
// The workaround is to set the name of the S3 endpoint explicitly,
// if not already set. See HADOOP-17771.
- // This change is harmless on older versions and compatible with
- // later Hadoop releases
if (hadoopConf.get("fs.s3a.endpoint", "").isEmpty &&
hadoopConf.get("fs.s3a.endpoint.region") == null) {
// set to US central endpoint which can also connect to buckets
// in other regions at the expense of a HEAD request during fs creation
- hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")
+ hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com", setBySpark)
}
}
private def appendSparkHiveConfigs(conf: SparkConf, hadoopConf:
Configuration): Unit = {
// Copy any "spark.hive.foo=bar" spark properties into conf as
"hive.foo=bar"
for ((key, value) <- conf.getAll if key.startsWith("spark.hive.")) {
- hadoopConf.set(key.substring("spark.".length), value)
+ hadoopConf.set(key.substring("spark.".length), value, SOURCE_SPARK_HIVE)
+ }
+ }
+
+ /**
+ * Extract the sources of a configuration key, or a default value if
+ * the key is not found or it has no known sources.
+ * Note that options provided by credential providers (JCEKS stores etc)
+ * are not resolved, so values retrieved by Configuration.getPassword()
+ * may not be recorded as having an origin.
+ * @param hadoopConf hadoop configuration to examine.
+ * @param key key to look up
+ * @return the origin of the current entry in the configuration, or the
empty string.
+ */
+ def propertySources(hadoopConf: Configuration, key: String): String = {
+ val sources = hadoopConf.getPropertySources(key)
+ if (sources != null && sources.nonEmpty) {
+ sources.mkString("," )
Review Comment:
nit.
```scala
- sources.mkString("," )
+ sources.mkString(",")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]