This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 60d02b444e2 [SPARK-45316][CORE][SQL] Add new parameters `ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD` 60d02b444e2 is described below commit 60d02b444e2225b3afbe4955dabbea505e9f769c Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Sep 26 17:33:07 2023 +0300 [SPARK-45316][CORE][SQL] Add new parameters `ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD` ### What changes were proposed in this pull request? In the PR, I propose to add new parameters `ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD`, and set it to the current value of: - `spark.files.ignoreCorruptFiles`/`ignoreMissingFiles` in Spark `core`, - `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` when the rdds created in Spark SQL. ### Why are the changes needed? 1. To make `HadoopRDD` and `NewHadoopRDD` consistent to other RDDs like `FileScanRDD` created by Spark SQL that take into account the SQL configs `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles`. 2. To improve user experience with Spark SQL, so, users can control ignoring of missing files without re-creating spark context. ### Does this PR introduce _any_ user-facing change? Yes, `HadoopRDD`/`NewHadoopRDD` invoked by SQL code such hive table scans respect the SQL configs `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` and don't respect the core configs `spark.files.ignoreCorruptFiles`/`ignoreMissingFiles`. ### How was this patch tested? By running the affected tests: ``` $ build/sbt "test:testOnly *QueryPartitionSuite" $ build/sbt "test:testOnly *FileSuite" $ build/sbt "test:testOnly *FileBasedDataSourceSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43097 from MaxGekk/dynamic-ignoreMissingFiles. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 31 ++++++++++++++++++---- .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 27 +++++++++++++++---- docs/sql-migration-guide.md | 1 + .../org/apache/spark/sql/hive/TableReader.scala | 9 ++++--- .../spark/sql/hive/QueryPartitionSuite.scala | 6 ++--- 5 files changed, 58 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index cad107256c5..0b5f6a3d716 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -89,6 +89,8 @@ private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: Inp * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate. + * @param ignoreCorruptFiles Whether to ignore corrupt files. + * @param ignoreMissingFiles Whether to ignore missing files. * * @note Instantiating this class directly is not recommended, please use * `org.apache.spark.SparkContext.hadoopRDD()` @@ -101,13 +103,36 @@ class HadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int) + minPartitions: Int, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean) extends RDD[(K, V)](sc, Nil) with Logging { if (initLocalJobConfFuncOpt.isDefined) { sparkContext.clean(initLocalJobConfFuncOpt.get) } + def this( + sc: SparkContext, + broadcastedConf: Broadcast[SerializableConfiguration], + initLocalJobConfFuncOpt: Option[JobConf => Unit], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int) = { + this( + sc, + broadcastedConf, + initLocalJobConfFuncOpt, + inputFormatClass, + keyClass, + valueClass, + minPartitions, + ignoreCorruptFiles = sc.conf.get(IGNORE_CORRUPT_FILES), + ignoreMissingFiles = sc.conf.get(IGNORE_MISSING_FILES) + ) + } + def this( sc: SparkContext, conf: JobConf, @@ -135,10 +160,6 @@ class HadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) - private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES) - - private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES) - private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) // Returns a JobConf that will be used on executors to obtain input splits for Hadoop reads. diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 119fdae531f..17ef3214889 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -64,6 +64,8 @@ private[spark] class NewHadoopPartition( * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. + * @param ignoreCorruptFiles Whether to ignore corrupt files. + * @param ignoreMissingFiles Whether to ignore missing files. * * @note Instantiating this class directly is not recommended, please use * `org.apache.spark.SparkContext.newAPIHadoopRDD()` @@ -74,9 +76,28 @@ class NewHadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - @transient private val _conf: Configuration) + @transient private val _conf: Configuration, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean) extends RDD[(K, V)](sc, Nil) with Logging { + def this( + sc : SparkContext, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + _conf: Configuration) = { + this( + sc, + inputFormatClass, + keyClass, + valueClass, + _conf, + ignoreCorruptFiles = sc.conf.get(IGNORE_CORRUPT_FILES), + ignoreMissingFiles = sc.conf.get(IGNORE_MISSING_FILES)) + } + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf)) // private val serializableConf = new SerializableWritable(_conf) @@ -90,10 +111,6 @@ class NewHadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) - private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES) - - private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES) - private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) def getConf: Configuration = { diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 61173f58b77..56a3c8292cd 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -25,6 +25,7 @@ license: | ## Upgrading from Spark SQL 3.5 to 4.0 - Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set `spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`). +- Since Spark 4.0, any read of SQL tables takes into consideration the SQL configs `spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` instead of the core config `spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`. ## Upgrading from Spark SQL 3.4 to 3.5 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index cd1d236dd36..5bb982624b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -366,7 +366,9 @@ class HadoopTableReader( inputFormatClass, classOf[Writable], classOf[Writable], - _minSplitsPerRDD) + _minSplitsPerRDD, + ignoreCorruptFiles = conf.ignoreCorruptFiles, + ignoreMissingFiles = conf.ignoreMissingFiles) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) @@ -400,8 +402,9 @@ class HadoopTableReader( inputFormatClass, classOf[Writable], classOf[Writable], - jobConf - ) + jobConf, + ignoreCorruptFiles = conf.ignoreCorruptFiles, + ignoreMissingFiles = conf.ignoreMissingFiles) // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index cec6ec1ee12..f4fb18119fa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.sql.Timestamp -import org.apache.spark.internal.config._ import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -73,8 +72,9 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl } test("Replace spark.sql.hive.verifyPartitionPath by spark.files.ignoreMissingFiles") { - withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false") { - sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true") + withSQLConf( + SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false", + SQLConf.IGNORE_MISSING_FILES.key -> "true") { queryWhenPathNotExist() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org