Repository: spark Updated Branches: refs/heads/branch-1.6 d3890deb7 -> 585c5657f
[SPARK-17850][CORE] Add a flag to ignore corrupt files (branch 1.6) ## What changes were proposed in this pull request? This is the patch for 1.6. It only adds Spark conf `spark.files.ignoreCorruptFiles` because SQL just uses HadoopRDD directly in 1.6. `spark.files.ignoreCorruptFiles` is `true` by default. ## How was this patch tested? The added test. Author: Shixiong Zhu <shixi...@databricks.com> Closes #15454 from zsxwing/SPARK-17850-1.6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/585c5657 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/585c5657 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/585c5657 Branch: refs/heads/branch-1.6 Commit: 585c5657f9452b7a1f4f6c9c0a9d933ebb4ed7b0 Parents: d3890de Author: Shixiong Zhu <shixi...@databricks.com> Authored: Thu Oct 13 00:33:00 2016 -0700 Committer: Mridul Muralidharan <mmuralidharan@HW11853.local> Committed: Thu Oct 13 00:33:00 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 10 +++- .../test/scala/org/apache/spark/FileSuite.scala | 62 +++++++++++++++++++- 3 files changed, 74 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index f37c95b..463dd5b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -139,6 +139,9 @@ class HadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = + sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true) + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value @@ -245,8 +248,7 @@ class HadoopRDD[K, V]( try { finished = !reader.next(key, value) } catch { - case eof: EOFException => - finished = true + case _: EOFException if ignoreCorruptFiles => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 46fe1ba..5b5ddd5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import java.io.EOFException import java.text.SimpleDateFormat import java.util.Date @@ -84,6 +85,9 @@ class NewHadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = + sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true) + def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -171,7 +175,11 @@ class NewHadoopRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { - finished = !reader.nextKeyValue + try { + finished = !reader.nextKeyValue + } catch { + case _: EOFException if ignoreCorruptFiles => finished = true + } if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/test/scala/org/apache/spark/FileSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index fdb00aa..7e87092 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark -import java.io.{File, FileWriter} +import java.io._ +import java.util.zip.GZIPOutputStream import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.input.PortableDataStream @@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { }.collect() assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) } + + test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") { + val inputFile = File.createTempFile("input-", ".gz") + try { + // Create a corrupt gzip file + val byteOutput = new ByteArrayOutputStream() + val gzip = new GZIPOutputStream(byteOutput) + try { + gzip.write(Array[Byte](1, 2, 3, 4)) + } finally { + gzip.close() + } + val bytes = byteOutput.toByteArray + val o = new FileOutputStream(inputFile) + try { + // It's corrupt since we only write half of bytes into the file. + o.write(bytes.take(bytes.length / 2)) + } finally { + o.close() + } + + // Spark job should ignore corrupt files by default + sc = new SparkContext("local", "test") + // Test HadoopRDD + assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty) + // Test NewHadoopRDD + assert { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect().isEmpty + } + sc.stop() + + // Reading a corrupt gzip file should throw EOFException + val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false") + sc = new SparkContext("local", "test", conf) + // Test HadoopRDD + var e = intercept[SparkException] { + sc.textFile(inputFile.toURI.toString).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + // Test NewHadoopRDD + e = intercept[SparkException] { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + } finally { + inputFile.delete() + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org