Deegue closed pull request #23494: merge
URL: https://github.com/apache/spark/pull/23494
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3dad1e34af236..12b9861deab0d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -561,6 +561,23 @@ object SQLConf {
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
+ val HIVE_FILE_INPUT_FORMAT_ENABLED =
buildConf("spark.sql.hive.fileInputFormat.enabled")
+ .doc("When true, enable optimizing the `fileInputFormat` in Spark SQL.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HIVE_FILE_INPUT_FORMAT_SPLIT_MAXSIZE =
+ buildConf("spark.sql.hive.fileInputFormat.split.maxsize")
+ .doc("The maxsize of per split while reading Hive tables.")
+ .longConf
+ .createWithDefault(128 * 1024 * 1024)
+
+ val HIVE_FILE_INPUT_FORMAT_SPLIT_MINSIZE =
+ buildConf("spark.sql.hive.fileInputFormat.split.minsize")
+ .doc("The minsize of per split while reading Hive tables.")
+ .longConf
+ .createWithDefault(32 * 1024 * 1024)
+
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the
table's metadata " +
"to produce the partition columns instead of table scans. It applies
when all the columns " +
@@ -1681,6 +1698,12 @@ class SQLConf extends Serializable with Logging {
def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
+ def fileInputFormatEnabled: Boolean = getConf(HIVE_FILE_INPUT_FORMAT_ENABLED)
+
+ def fileInputFormatSplitMaxsize: Long =
getConf(HIVE_FILE_INPUT_FORMAT_SPLIT_MAXSIZE)
+
+ def fileInputFormatSplitMinsize: Long =
getConf(HIVE_FILE_INPUT_FORMAT_SPLIT_MINSIZE)
+
def compareDateTimestampInTimestamp : Boolean =
getConf(COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP)
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 7d57389947576..481768cd217d6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -123,8 +123,26 @@ class HadoopTableReader(
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
// logDebug("Table input: %s".format(tablePath))
- val ifc = hiveTable.getInputFormatClass
+ var ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ if (conf.fileInputFormatEnabled) {
+ hadoopConf.set("mapreduce.input.fileinputformat.split.maxsize",
+ conf.fileInputFormatSplitMaxsize.toString)
+ hadoopConf.set("mapreduce.input.fileinputformat.split.minsize",
+ conf.fileInputFormatSplitMinsize.toString)
+ if ("org.apache.hadoop.mapreduce.lib.input.TextInputFormat"
+ .equals(hiveTable.getInputFormatClass.getName)) {
+ ifc = Utils.classForName(
+ "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat")
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
+ if ("org.apache.hadoop.mapred.TextInputFormat"
+ .equals(hiveTable.getInputFormatClass.getName)) {
+ ifc = Utils.classForName(
+ "org.apache.hadoop.mapred.lib.CombineTextInputFormat")
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
+ }
val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
val attrsWithIndex = attributes.zipWithIndex
@@ -164,7 +182,7 @@ class HadoopTableReader(
def verifyPartitionPath(
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
Map[HivePartition, Class[_ <: Deserializer]] = {
- if (!sparkSession.sessionState.conf.verifyPartitionPath) {
+ if (!conf.verifyPartitionPath) {
partitionToDeserializer
} else {
val existPathSet = collection.mutable.Set[String]()
@@ -202,8 +220,26 @@ class HadoopTableReader(
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = partition.getDataLocation
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
- val ifc = partDesc.getInputFileFormatClass
+ var ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ if (conf.fileInputFormatEnabled) {
+ hadoopConf.set("mapreduce.input.fileinputformat.split.maxsize",
+ conf.fileInputFormatSplitMaxsize.toString)
+ hadoopConf.set("mapreduce.input.fileinputformat.split.minsize",
+ conf.fileInputFormatSplitMinsize.toString)
+ if ("org.apache.hadoop.mapreduce.lib.input.TextInputFormat"
+ .equals(partDesc.getInputFileFormatClass.getName)) {
+ ifc = Utils.classForName(
+ "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat")
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
+ if ("org.apache.hadoop.mapred.TextInputFormat"
+ .equals(partDesc.getInputFileFormatClass.getName)) {
+ ifc = Utils.classForName(
+ "org.apache.hadoop.mapred.lib.CombineTextInputFormat")
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
+ }
// Get partition field info
val partSpec = partDesc.getPartSpec
val partProps = partDesc.getProperties
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index 3f9bb8de42e09..e70e74d7750f7 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -192,4 +192,54 @@ class HiveTableScanSuite extends HiveComparisonTest with
SQLTestUtils with TestH
case p: HiveTableScanExec => p
}.get
}
-}
+
+ test("HiveTableScanExec canonicalization for different orders of partition
filters") {
+ withTable("table_old", "table_pt_old", "table_new", "table_pt_new") {
+ sql("set spark.sql.hive.fileInputFormat.enabled=true")
+ sql("set spark.sql.hive.fileInputFormat.split.maxsize=134217728")
+ sql("set spark.sql.hive.fileInputFormat.split.minsize=134217728")
+ sql(
+ s"""
+ |CREATE TABLE table_old (id int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+ sql(
+ s"""
+ |CREATE TABLE table_pt_old (id int)
+ |PARTITIONED BY (a int, b int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+ sql(
+ s"""
+ |CREATE TABLE table_new (id int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+ sql(
+ s"""
+ |CREATE TABLE table_pt_new (id int)
+ |PARTITIONED BY (a int, b int)
+ |STORED AS
+ |INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
+ |OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
+ """.stripMargin)
+ intercept[Exception] {
+ sql("SELECT count(1) FROM table_old")
+ }
+ intercept[Exception] {
+ sql("SELECT count(1) FROM table_pt_old")
+ }
+ intercept[Exception] {
+ sql("SELECT count(1) FROM table_new")
+ }
+ intercept[Exception] {
+ sql("SELECT count(1) FROM table_pt_new")
+ }
+ }
+ }
+}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]