[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22611 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22611#discussion_r16887 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat } } + private def inferAvroSchemaFromFiles( + files: Seq[FileStatus], + conf: Configuration, + ignoreExtension: Boolean): Schema = { +val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles +// Schema evolution is not supported yet. Here we only pick first random readable sample file to +// figure out the schema of the whole dataset. +val avroReader = files.iterator.map { f => + val path = f.getPath + if (!ignoreExtension && !path.getName.endsWith(".avro")) { +None + } else { +val in = new FsInput(path, conf) +try { + Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())) +} catch { + case e: IOException => +if (ignoreCorruptFiles) { + logWarning(s"Skipped the footer in the corrupted file: $path", e) + None +} else { + throw new SparkException(s"Could not read file: $path", e) +} +} finally { + in.close() +} + } +}.collectFirst { + case Some(reader) => reader +} + +avroReader match { + case Some(reader) => +try { --- End diff -- This one is too trivial. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22611#discussion_r222169616 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -342,6 +342,53 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + private def createDummyCorruptFile(dir: File): Unit = { +FileUtils.forceMkdir(dir) +val corruptFile = new File(dir, "corrupt.avro") +val writer = new BufferedWriter(new FileWriter(corruptFile)) +writer.write("corrupt") +writer.close() + } + + test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") { +withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + withTempPath { dir => +createDummyCorruptFile(dir) +val message = intercept[FileNotFoundException] { + spark.read.format("avro").load(dir.getAbsolutePath).schema +}.getMessage +assert(message.contains("No Avro files found.")) + +val srcFile = new File("src/test/resources/episodes.avro") +val destFile = new File(dir, "episodes.avro") +FileUtils.copyFile(srcFile, destFile) + +val df = spark.read.format("avro").load(srcFile.getAbsolutePath) +val schema = df.schema +val result = df.collect() +// Schema inference picks random readable sample file. +// Here we use a loop to eliminate randomness. +(1 to 5).foreach { _ => + assert(spark.read.format("avro").load(dir.getAbsolutePath).schema == schema) + checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), result) +} + } +} + } + + test("Throws IOException on reading corrupt Avro file if flag IGNORE_CORRUPT_FILES disabled") { +withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + withTempPath { dir => +createDummyCorruptFile(dir) +val message = intercept[org.apache.spark.SparkException] { + spark.read.format("avro").load(dir.getAbsolutePath).schema --- End diff -- `.schema` wouldn't probably be needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22611#discussion_r222169458 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -342,6 +342,53 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + private def createDummyCorruptFile(dir: File): Unit = { +FileUtils.forceMkdir(dir) +val corruptFile = new File(dir, "corrupt.avro") +val writer = new BufferedWriter(new FileWriter(corruptFile)) +writer.write("corrupt") +writer.close() + } + + test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") { +withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + withTempPath { dir => +createDummyCorruptFile(dir) +val message = intercept[FileNotFoundException] { + spark.read.format("avro").load(dir.getAbsolutePath).schema +}.getMessage +assert(message.contains("No Avro files found.")) + +val srcFile = new File("src/test/resources/episodes.avro") +val destFile = new File(dir, "episodes.avro") +FileUtils.copyFile(srcFile, destFile) + +val df = spark.read.format("avro").load(srcFile.getAbsolutePath) +val schema = df.schema +val result = df.collect() +// Schema inference picks random readable sample file. +// Here we use a loop to eliminate randomness. --- End diff -- Actually I don't think it's randomness in this test. In this test, HDFS lists files in an alphabetical order under to the hood although it's not guaranteed. I think the picking order here at least is deterministic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22611#discussion_r222167078 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -342,6 +342,53 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + private def createDummyCorruptFile(dir: File): Unit = { +FileUtils.forceMkdir(dir) +val corruptFile = new File(dir, "corrupt.avro") +val writer = new BufferedWriter(new FileWriter(corruptFile)) +writer.write("corrupt") +writer.close() --- End diff -- ditto for `tryWithResource` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22611#discussion_r222167036 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat } } + private def inferAvroSchemaFromFiles( + files: Seq[FileStatus], + conf: Configuration, + ignoreExtension: Boolean): Schema = { +val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles +// Schema evolution is not supported yet. Here we only pick first random readable sample file to +// figure out the schema of the whole dataset. +val avroReader = files.iterator.map { f => + val path = f.getPath + if (!ignoreExtension && !path.getName.endsWith(".avro")) { +None + } else { +val in = new FsInput(path, conf) +try { + Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())) +} catch { + case e: IOException => +if (ignoreCorruptFiles) { + logWarning(s"Skipped the footer in the corrupted file: $path", e) + None +} else { + throw new SparkException(s"Could not read file: $path", e) +} +} finally { + in.close() +} + } +}.collectFirst { + case Some(reader) => reader +} + +avroReader match { + case Some(reader) => +try { --- End diff -- ditto for `tryWithResource` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22611#discussion_r222166950 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat } } + private def inferAvroSchemaFromFiles( + files: Seq[FileStatus], + conf: Configuration, + ignoreExtension: Boolean): Schema = { +val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles +// Schema evolution is not supported yet. Here we only pick first random readable sample file to +// figure out the schema of the whole dataset. +val avroReader = files.iterator.map { f => + val path = f.getPath + if (!ignoreExtension && !path.getName.endsWith(".avro")) { +None + } else { +val in = new FsInput(path, conf) --- End diff -- Not a big deal but we can use `Utils.tryiWithResource` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22611#discussion_r222166498 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -100,6 +77,50 @@ private[avro] class AvroFileFormat extends FileFormat } } + private def inferAvroSchemaFromFiles( + files: Seq[FileStatus], + conf: Configuration, + ignoreExtension: Boolean): Schema = { +val ignoreCorruptFiles = SQLConf.get.ignoreCorruptFiles --- End diff -- Bout about matching it to `sparkSession.sessionState.conf.ignoreCorruptFiles` like other occurrences? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22611: [SPARK-25595] Ignore corrupt Avro files if flag I...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/22611 [SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled ## What changes were proposed in this pull request? With flag IGNORE_CORRUPT_FILES enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark ignoreCorruptAvro Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22611 commit e96ea209731acba240943f890773e6eb1d87dee8 Author: Gengliang Wang Date: 2018-10-02T08:21:03Z Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org