[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22611


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-03 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22611#discussion_r16887
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat
 }
   }
 
+  private def inferAvroSchemaFromFiles(
+  files: Seq[FileStatus],
+  conf: Configuration,
+  ignoreExtension: Boolean): Schema = {
+val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles
+// Schema evolution is not supported yet. Here we only pick first 
random readable sample file to
+// figure out the schema of the whole dataset.
+val avroReader = files.iterator.map { f =>
+  val path = f.getPath
+  if (!ignoreExtension && !path.getName.endsWith(".avro")) {
+None
+  } else {
+val in = new FsInput(path, conf)
+try {
+  Some(DataFileReader.openReader(in, new 
GenericDatumReader[GenericRecord]()))
+} catch {
+  case e: IOException =>
+if (ignoreCorruptFiles) {
+  logWarning(s"Skipped the footer in the corrupted file: 
$path", e)
+  None
+} else {
+  throw new SparkException(s"Could not read file: $path", e)
+}
+} finally {
+  in.close()
+}
+  }
+}.collectFirst {
+  case Some(reader) => reader
+}
+
+avroReader match {
+  case Some(reader) =>
+try {
--- End diff --

This one is too trivial.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22611#discussion_r222169616
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -342,6 +342,53 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 }
   }
 
+  private def createDummyCorruptFile(dir: File): Unit = {
+FileUtils.forceMkdir(dir)
+val corruptFile = new File(dir, "corrupt.avro")
+val writer = new BufferedWriter(new FileWriter(corruptFile))
+writer.write("corrupt")
+writer.close()
+  }
+
+  test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") {
+withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+  withTempPath { dir =>
+createDummyCorruptFile(dir)
+val message = intercept[FileNotFoundException] {
+  spark.read.format("avro").load(dir.getAbsolutePath).schema
+}.getMessage
+assert(message.contains("No Avro files found."))
+
+val srcFile = new File("src/test/resources/episodes.avro")
+val destFile = new File(dir, "episodes.avro")
+FileUtils.copyFile(srcFile, destFile)
+
+val df = spark.read.format("avro").load(srcFile.getAbsolutePath)
+val schema = df.schema
+val result = df.collect()
+// Schema inference picks random readable sample file.
+// Here we use a loop to eliminate randomness.
+(1 to 5).foreach { _ =>
+  
assert(spark.read.format("avro").load(dir.getAbsolutePath).schema == schema)
+  checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), 
result)
+}
+  }
+}
+  }
+
+  test("Throws IOException on reading corrupt Avro file if flag 
IGNORE_CORRUPT_FILES disabled") {
+withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+  withTempPath { dir =>
+createDummyCorruptFile(dir)
+val message = intercept[org.apache.spark.SparkException] {
+  spark.read.format("avro").load(dir.getAbsolutePath).schema
--- End diff --

`.schema` wouldn't probably be needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22611#discussion_r222169458
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -342,6 +342,53 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 }
   }
 
+  private def createDummyCorruptFile(dir: File): Unit = {
+FileUtils.forceMkdir(dir)
+val corruptFile = new File(dir, "corrupt.avro")
+val writer = new BufferedWriter(new FileWriter(corruptFile))
+writer.write("corrupt")
+writer.close()
+  }
+
+  test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") {
+withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+  withTempPath { dir =>
+createDummyCorruptFile(dir)
+val message = intercept[FileNotFoundException] {
+  spark.read.format("avro").load(dir.getAbsolutePath).schema
+}.getMessage
+assert(message.contains("No Avro files found."))
+
+val srcFile = new File("src/test/resources/episodes.avro")
+val destFile = new File(dir, "episodes.avro")
+FileUtils.copyFile(srcFile, destFile)
+
+val df = spark.read.format("avro").load(srcFile.getAbsolutePath)
+val schema = df.schema
+val result = df.collect()
+// Schema inference picks random readable sample file.
+// Here we use a loop to eliminate randomness.
--- End diff --

Actually I don't think it's randomness in this test. In this test, HDFS 
lists files in an alphabetical order under to the hood although it's not 
guaranteed. I think the picking order here at least is deterministic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22611#discussion_r222167078
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -342,6 +342,53 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 }
   }
 
+  private def createDummyCorruptFile(dir: File): Unit = {
+FileUtils.forceMkdir(dir)
+val corruptFile = new File(dir, "corrupt.avro")
+val writer = new BufferedWriter(new FileWriter(corruptFile))
+writer.write("corrupt")
+writer.close()
--- End diff --

ditto for `tryWithResource`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22611#discussion_r222167036
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat
 }
   }
 
+  private def inferAvroSchemaFromFiles(
+  files: Seq[FileStatus],
+  conf: Configuration,
+  ignoreExtension: Boolean): Schema = {
+val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles
+// Schema evolution is not supported yet. Here we only pick first 
random readable sample file to
+// figure out the schema of the whole dataset.
+val avroReader = files.iterator.map { f =>
+  val path = f.getPath
+  if (!ignoreExtension && !path.getName.endsWith(".avro")) {
+None
+  } else {
+val in = new FsInput(path, conf)
+try {
+  Some(DataFileReader.openReader(in, new 
GenericDatumReader[GenericRecord]()))
+} catch {
+  case e: IOException =>
+if (ignoreCorruptFiles) {
+  logWarning(s"Skipped the footer in the corrupted file: 
$path", e)
+  None
+} else {
+  throw new SparkException(s"Could not read file: $path", e)
+}
+} finally {
+  in.close()
+}
+  }
+}.collectFirst {
+  case Some(reader) => reader
+}
+
+avroReader match {
+  case Some(reader) =>
+try {
--- End diff --

ditto for `tryWithResource`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22611#discussion_r222166950
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat
 }
   }
 
+  private def inferAvroSchemaFromFiles(
+  files: Seq[FileStatus],
+  conf: Configuration,
+  ignoreExtension: Boolean): Schema = {
+val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles
+// Schema evolution is not supported yet. Here we only pick first 
random readable sample file to
+// figure out the schema of the whole dataset.
+val avroReader = files.iterator.map { f =>
+  val path = f.getPath
+  if (!ignoreExtension && !path.getName.endsWith(".avro")) {
+None
+  } else {
+val in = new FsInput(path, conf)
--- End diff --

Not a big deal but we can use `Utils.tryiWithResource`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-02 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22611#discussion_r222166498
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat
 }
   }
 
+  private def inferAvroSchemaFromFiles(
+  files: Seq[FileStatus],
+  conf: Configuration,
+  ignoreExtension: Boolean): Schema = {
+val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles
--- End diff --

Bout about matching it to 
`sparkSession.sessionState.conf.ignoreCorruptFiles` like other occurrences?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...

2018-10-02 Thread gengliangwang
GitHub user gengliangwang opened a pull request:

https://github.com/apache/spark/pull/22611

[SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled

## What changes were proposed in this pull request?

With flag IGNORE_CORRUPT_FILES enabled, schema inference should ignore 
corrupt Avro files, which is consistent with Parquet and Orc data source.

## How was this patch tested?

Unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gengliangwang/spark ignoreCorruptAvro

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22611


commit e96ea209731acba240943f890773e6eb1d87dee8
Author: Gengliang Wang 
Date:   2018-10-02T08:21:03Z

Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org