mridulm commented on code in PR #42037:
URL: https://github.com/apache/spark/pull/42037#discussion_r1267527890
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
assert(supportBatch(sparkSession, resultSchema))
}
- OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
sqlConf.caseSensitiveAnalysis)
-
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
isCaseSensitive)
+ val broadcastedHadoopConf = if (options.isEmpty) {
+ Option.empty
+ } else {
+ Option(sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf)))
+ }
Review Comment:
```suggestion
val broadcastedHadoopConf = options.map(_ =>
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))
```
##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala:
##########
@@ -136,20 +137,37 @@ class OrcFileFormat extends FileFormat with
DataSourceRegister with Serializable
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] =
{
- if (sparkSession.sessionState.conf.orcFilterPushDown) {
+ val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+ if (orcFilterPushDown) {
// Sets pushed predicates
OrcFilters.createFilter(requiredSchema, filters).foreach { f =>
hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, toKryo(f))
hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}
- val broadcastedHadoopConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+ val broadcastedHadoopConf = if (options.isEmpty) {
+ Option.empty
+ } else {
+ Option(sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf)))
+ }
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+ val sparkConf = sparkSession.sparkContext.conf
(file: PartitionedFile) => {
- val conf = broadcastedHadoopConf.value.value
+ val conf = if (broadcastedHadoopConf.isDefined) {
+ broadcastedHadoopConf.get.value.value
+ } else {
+ val conf = SparkHadoopUtil.newConfiguration(sparkConf)
+ if (orcFilterPushDown) {
+ // Sets pushed predicates
+ OrcFilters.createFilter(requiredSchema, filters).foreach { f =>
+ conf.set(OrcFileFormat.SARG_PUSHDOWN, toKryo(f))
+ conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+ }
+ }
+ new SerializableConfiguration(conf).value
+ }
Review Comment:
My comments in `OrcFileFormat` apply here as well.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
assert(supportBatch(sparkSession, resultSchema))
}
- OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
sqlConf.caseSensitiveAnalysis)
-
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
isCaseSensitive)
+ val broadcastedHadoopConf = if (options.isEmpty) {
+ Option.empty
+ } else {
+ Option(sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf)))
+ }
+
val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+ val sparkConf = sparkSession.sparkContext.conf
(file: PartitionedFile) => {
- val conf = broadcastedConf.value.value
+ val sharedConf = if (broadcastedHadoopConf.isDefined) {
+ broadcastedHadoopConf.get.value.value
+ } else {
+ val conf = SparkHadoopUtil.newConfiguration(sparkConf)
+ OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf,
isCaseSensitive)
+ new SerializableConfiguration(conf).value
+ }
Review Comment:
Simply use `conf` here ? Why `new SerializableConfiguration(conf).value` ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -94,6 +94,14 @@ trait FileFormat {
false
}
+ /**
+ * Returns whether Hadoop configuration needs to be broadcasted.
+ */
+ def isBroadcastHadoopConf(
Review Comment:
`isBroadcastHadoopConf` -> `shouldBroadcastHadoopConf`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
assert(supportBatch(sparkSession, resultSchema))
}
- OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
sqlConf.caseSensitiveAnalysis)
-
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
isCaseSensitive)
+ val broadcastedHadoopConf = if (options.isEmpty) {
+ Option.empty
+ } else {
+ Option(sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf)))
+ }
+
val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+ val sparkConf = sparkSession.sparkContext.conf
(file: PartitionedFile) => {
- val conf = broadcastedConf.value.value
+ val sharedConf = if (broadcastedHadoopConf.isDefined) {
+ broadcastedHadoopConf.get.value.value
+ } else {
+ val conf = SparkHadoopUtil.newConfiguration(sparkConf)
Review Comment:
You are assuming `hadoopConf` ==
`SparkHadoopUtil.newConfiguration(sparkConf)`
This does not hold in general, particularly when the code evolves.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
assert(supportBatch(sparkSession, resultSchema))
}
- OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
sqlConf.caseSensitiveAnalysis)
-
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf,
isCaseSensitive)
+ val broadcastedHadoopConf = if (options.isEmpty) {
+ Option.empty
+ } else {
+ Option(sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf)))
+ }
+
val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+ val sparkConf = sparkSession.sparkContext.conf
(file: PartitionedFile) => {
- val conf = broadcastedConf.value.value
+ val sharedConf = if (broadcastedHadoopConf.isDefined) {
Review Comment:
rename `sharedConf` to `conf` and minimize unnecessary diffs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]