Repository: spark
Updated Branches:
  refs/heads/master bbd6f0c25 -> 106880edc


[SPARK-24836][SQL] New option for Avro datasource - ignoreExtension

## What changes were proposed in this pull request?

I propose to add new option for AVRO datasource which should control ignoring 
of files without `.avro` extension in read. The option has name 
`ignoreExtension` with default value `true`. If both options `ignoreExtension` 
and `avro.mapred.ignore.inputs.without.extension` are set, `ignoreExtension` 
overrides the former one. Here is an example of usage:

```
spark
  .read
  .option("ignoreExtension", false)
  .avro("path to avro files")
```

## How was this patch tested?

I added a test which checks the option directly and a test for checking that 
new option overrides hadoop's config.

Author: Maxim Gekk <maxim.g...@databricks.com>

Closes #21798 from MaxGekk/avro-ignore-extension.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/106880ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/106880ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/106880ed

Branch: refs/heads/master
Commit: 106880edcd67bc20e8610a16f8ce6aa250268eeb
Parents: bbd6f0c
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Fri Jul 20 20:04:40 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Fri Jul 20 20:04:40 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/avro/AvroFileFormat.scala  | 33 +++++-------
 .../org/apache/spark/sql/avro/AvroOptions.scala | 29 +++++++++--
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 55 +++++++++++++++++++-
 3 files changed, 91 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/106880ed/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 780e457..078efab 100755
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -58,21 +58,19 @@ private[avro] class AvroFileFormat extends FileFormat with 
DataSourceRegister {
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     val conf = spark.sparkContext.hadoopConfiguration
-    val parsedOptions = new AvroOptions(options)
+    val parsedOptions = new AvroOptions(options, conf)
 
     // Schema evolution is not supported yet. Here we only pick a single 
random sample file to
     // figure out the schema of the whole dataset.
     val sampleFile =
-      if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
-        files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
-          throw new FileNotFoundException(
-            "No Avro files found. Hadoop option 
\"avro.mapred.ignore.inputs.without.extension\" " +
-              " is set to true. Do all input files have \".avro\" extension?"
-          )
+      if (parsedOptions.ignoreExtension) {
+        files.headOption.getOrElse {
+          throw new FileNotFoundException("Files for schema inferring have 
been not found.")
         }
       } else {
-        files.headOption.getOrElse {
-          throw new FileNotFoundException("No Avro files found.")
+        files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
+          throw new FileNotFoundException(
+            "No Avro files found. If files don't have .avro extension, set 
ignoreExtension to true")
         }
       }
 
@@ -115,7 +113,7 @@ private[avro] class AvroFileFormat extends FileFormat with 
DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
-    val parsedOptions = new AvroOptions(options)
+    val parsedOptions = new AvroOptions(options, 
spark.sessionState.newHadoopConf())
     val outputAvroSchema = SchemaConverters.toAvroType(
       dataSchema, nullable = false, parsedOptions.recordName, 
parsedOptions.recordNamespace)
 
@@ -160,7 +158,7 @@ private[avro] class AvroFileFormat extends FileFormat with 
DataSourceRegister {
 
     val broadcastedConf =
       spark.sparkContext.broadcast(new 
AvroFileFormat.SerializableConfiguration(hadoopConf))
-    val parsedOptions = new AvroOptions(options)
+    val parsedOptions = new AvroOptions(options, hadoopConf)
 
     (file: PartitionedFile) => {
       val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
@@ -171,9 +169,7 @@ private[avro] class AvroFileFormat extends FileFormat with 
DataSourceRegister {
       // Doing input file filtering is improper because we may generate empty 
tasks that process no
       // input files but stress the scheduler. We should probably add a more 
general input file
       // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
-      if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && 
!file.filePath.endsWith(".avro")) {
-        Iterator.empty
-      } else {
+      if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) {
         val reader = {
           val in = new FsInput(new Path(new URI(file.filePath)), conf)
           try {
@@ -228,6 +224,8 @@ private[avro] class AvroFileFormat extends FileFormat with 
DataSourceRegister {
             deserializer.deserialize(record).asInstanceOf[InternalRow]
           }
         }
+      } else {
+        Iterator.empty
       }
     }
   }
@@ -274,11 +272,4 @@ private[avro] object AvroFileFormat {
       value.readFields(new DataInputStream(in))
     }
   }
-
-  def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
-    // Files without .avro extensions are not ignored by default
-    val defaultValue = false
-
-    conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, 
defaultValue)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/106880ed/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 8721eae..cd9a911 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -17,16 +17,21 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 
 /**
  * Options for Avro Reader and Writer stored in case insensitive manner.
  */
-class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
-  extends Logging with Serializable {
+class AvroOptions(
+    @transient val parameters: CaseInsensitiveMap[String],
+    @transient val conf: Configuration) extends Logging with Serializable {
 
-  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+  def this(parameters: Map[String, String], conf: Configuration) = {
+    this(CaseInsensitiveMap(parameters), conf)
+  }
 
   /**
    * Optional schema provided by an user in JSON format.
@@ -45,4 +50,22 @@ class AvroOptions(@transient val parameters: 
CaseInsensitiveMap[String])
    * See Avro spec for details: 
https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
    */
   val recordNamespace: String = parameters.getOrElse("recordNamespace", "")
+
+  /**
+   * The `ignoreExtension` option controls ignoring of files without `.avro` 
extensions in read.
+   * If the option is enabled, all files (with and without `.avro` extension) 
are loaded.
+   * If the option is not set, the Hadoop's config 
`avro.mapred.ignore.inputs.without.extension`
+   * is taken into account. If the former one is not set too, file extensions 
are ignored.
+   */
+  val ignoreExtension: Boolean = {
+    val ignoreFilesWithoutExtensionByDefault = false
+    val ignoreFilesWithoutExtension = conf.getBoolean(
+      AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
+      ignoreFilesWithoutExtensionByDefault)
+
+    parameters
+      .get("ignoreExtension")
+      .map(_.toBoolean)
+      .getOrElse(!ignoreFilesWithoutExtension)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/106880ed/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index f7e9877..dad56aa 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -630,10 +630,21 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
           hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, 
"true")
           spark.read.avro(dir.toString)
         } finally {
-          hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) 
       }
+          hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
+        }
       }
     }
 
+    intercept[FileNotFoundException] {
+      withTempPath { dir =>
+        FileUtils.touch(new File(dir, "test"))
+
+        spark
+          .read
+          .option("ignoreExtension", false)
+          .avro(dir.toString)
+      }
+    }
   }
 
   test("SQL test insert overwrite") {
@@ -702,7 +713,6 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
       } finally {
         hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
       }
-
       assert(count == 8)
     }
   }
@@ -838,4 +848,45 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
       assert(df2.count == 8)
     }
   }
+
+  test("SPARK-24836: checking the ignoreExtension option") {
+    withTempPath { tempDir =>
+      val df = spark.read.avro(episodesAvro)
+      assert(df.count == 8)
+
+      val tempSaveDir = s"$tempDir/save/"
+      df.write.avro(tempSaveDir)
+
+      Files.createFile(new File(tempSaveDir, "non-avro").toPath)
+
+      val newDf = spark
+        .read
+        .option("ignoreExtension", false)
+        .avro(tempSaveDir)
+
+      assert(newDf.count == 8)
+    }
+  }
+
+  test("SPARK-24836: ignoreExtension must override hadoop's config") {
+    withTempDir { dir =>
+      Files.copy(
+        Paths.get(new URL(episodesAvro).toURI),
+        Paths.get(dir.getCanonicalPath, "episodes"))
+
+      val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
+      val count = try {
+        hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, 
"true")
+        val newDf = spark
+          .read
+          .option("ignoreExtension", "true")
+          .avro(s"${dir.getCanonicalPath}/episodes")
+        newDf.count()
+      } finally {
+        hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
+      }
+
+      assert(count == 8)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to