Repository: spark Updated Branches: refs/heads/master bbd6f0c25 -> 106880edc
[SPARK-24836][SQL] New option for Avro datasource - ignoreExtension ## What changes were proposed in this pull request? I propose to add new option for AVRO datasource which should control ignoring of files without `.avro` extension in read. The option has name `ignoreExtension` with default value `true`. If both options `ignoreExtension` and `avro.mapred.ignore.inputs.without.extension` are set, `ignoreExtension` overrides the former one. Here is an example of usage: ``` spark .read .option("ignoreExtension", false) .avro("path to avro files") ``` ## How was this patch tested? I added a test which checks the option directly and a test for checking that new option overrides hadoop's config. Author: Maxim Gekk <maxim.g...@databricks.com> Closes #21798 from MaxGekk/avro-ignore-extension. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/106880ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/106880ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/106880ed Branch: refs/heads/master Commit: 106880edcd67bc20e8610a16f8ce6aa250268eeb Parents: bbd6f0c Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Fri Jul 20 20:04:40 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Fri Jul 20 20:04:40 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/avro/AvroFileFormat.scala | 33 +++++------- .../org/apache/spark/sql/avro/AvroOptions.scala | 29 +++++++++-- .../org/apache/spark/sql/avro/AvroSuite.scala | 55 +++++++++++++++++++- 3 files changed, 91 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/106880ed/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 780e457..078efab 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -58,21 +58,19 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sparkContext.hadoopConfiguration - val parsedOptions = new AvroOptions(options) + val parsedOptions = new AvroOptions(options, conf) // Schema evolution is not supported yet. Here we only pick a single random sample file to // figure out the schema of the whole dataset. val sampleFile = - if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) { - files.find(_.getPath.getName.endsWith(".avro")).getOrElse { - throw new FileNotFoundException( - "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " + - " is set to true. Do all input files have \".avro\" extension?" - ) + if (parsedOptions.ignoreExtension) { + files.headOption.getOrElse { + throw new FileNotFoundException("Files for schema inferring have been not found.") } } else { - files.headOption.getOrElse { - throw new FileNotFoundException("No Avro files found.") + files.find(_.getPath.getName.endsWith(".avro")).getOrElse { + throw new FileNotFoundException( + "No Avro files found. If files don't have .avro extension, set ignoreExtension to true") } } @@ -115,7 +113,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val parsedOptions = new AvroOptions(options) + val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf()) val outputAvroSchema = SchemaConverters.toAvroType( dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) @@ -160,7 +158,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { val broadcastedConf = spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf)) - val parsedOptions = new AvroOptions(options) + val parsedOptions = new AvroOptions(options, hadoopConf) (file: PartitionedFile) => { val log = LoggerFactory.getLogger(classOf[AvroFileFormat]) @@ -171,9 +169,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { // Doing input file filtering is improper because we may generate empty tasks that process no // input files but stress the scheduler. We should probably add a more general input file // filtering mechanism for `FileFormat` data sources. See SPARK-16317. - if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) { - Iterator.empty - } else { + if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) { val reader = { val in = new FsInput(new Path(new URI(file.filePath)), conf) try { @@ -228,6 +224,8 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { deserializer.deserialize(record).asInstanceOf[InternalRow] } } + } else { + Iterator.empty } } } @@ -274,11 +272,4 @@ private[avro] object AvroFileFormat { value.readFields(new DataInputStream(in)) } } - - def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = { - // Files without .avro extensions are not ignored by default - val defaultValue = false - - conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/106880ed/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 8721eae..cd9a911 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -17,16 +17,21 @@ package org.apache.spark.sql.avro +import org.apache.hadoop.conf.Configuration + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** * Options for Avro Reader and Writer stored in case insensitive manner. */ -class AvroOptions(@transient val parameters: CaseInsensitiveMap[String]) - extends Logging with Serializable { +class AvroOptions( + @transient val parameters: CaseInsensitiveMap[String], + @transient val conf: Configuration) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], conf: Configuration) = { + this(CaseInsensitiveMap(parameters), conf) + } /** * Optional schema provided by an user in JSON format. @@ -45,4 +50,22 @@ class AvroOptions(@transient val parameters: CaseInsensitiveMap[String]) * See Avro spec for details: https://avro.apache.org/docs/1.8.2/spec.html#schema_record . */ val recordNamespace: String = parameters.getOrElse("recordNamespace", "") + + /** + * The `ignoreExtension` option controls ignoring of files without `.avro` extensions in read. + * If the option is enabled, all files (with and without `.avro` extension) are loaded. + * If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension` + * is taken into account. If the former one is not set too, file extensions are ignored. + */ + val ignoreExtension: Boolean = { + val ignoreFilesWithoutExtensionByDefault = false + val ignoreFilesWithoutExtension = conf.getBoolean( + AvroFileFormat.IgnoreFilesWithoutExtensionProperty, + ignoreFilesWithoutExtensionByDefault) + + parameters + .get("ignoreExtension") + .map(_.toBoolean) + .getOrElse(!ignoreFilesWithoutExtension) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/106880ed/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index f7e9877..dad56aa 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -630,10 +630,21 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") spark.read.avro(dir.toString) } finally { - hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) } + hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + } } } + intercept[FileNotFoundException] { + withTempPath { dir => + FileUtils.touch(new File(dir, "test")) + + spark + .read + .option("ignoreExtension", false) + .avro(dir.toString) + } + } } test("SQL test insert overwrite") { @@ -702,7 +713,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } finally { hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) } - assert(count == 8) } } @@ -838,4 +848,45 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df2.count == 8) } } + + test("SPARK-24836: checking the ignoreExtension option") { + withTempPath { tempDir => + val df = spark.read.avro(episodesAvro) + assert(df.count == 8) + + val tempSaveDir = s"$tempDir/save/" + df.write.avro(tempSaveDir) + + Files.createFile(new File(tempSaveDir, "non-avro").toPath) + + val newDf = spark + .read + .option("ignoreExtension", false) + .avro(tempSaveDir) + + assert(newDf.count == 8) + } + } + + test("SPARK-24836: ignoreExtension must override hadoop's config") { + withTempDir { dir => + Files.copy( + Paths.get(new URL(episodesAvro).toURI), + Paths.get(dir.getCanonicalPath, "episodes")) + + val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration + val count = try { + hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + val newDf = spark + .read + .option("ignoreExtension", "true") + .avro(s"${dir.getCanonicalPath}/episodes") + newDf.count() + } finally { + hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + } + + assert(count == 8) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org