Repository: spark
Updated Branches:
  refs/heads/branch-2.1 fcd22e538 -> 1cafc76ea


[SPARK-18774][CORE][SQL] Ignore non-existing files when ignoreCorruptFiles is 
enabled (branch 2.1)

## What changes were proposed in this pull request?

Backport #16203 to branch 2.1.

## How was this patch tested?

Jennkins

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #16216 from zsxwing/SPARK-18774-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cafc76e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cafc76e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cafc76e

Branch: refs/heads/branch-2.1
Commit: 1cafc76ea1e9eef40b24060d1cd7c4aaf9f16a49
Parents: fcd22e5
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 8 17:58:44 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Dec 8 17:58:44 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  3 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 30 +++++++-----
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 50 ++++++++++++--------
 .../sql/execution/datasources/FileScanRDD.scala |  3 ++
 .../org/apache/spark/sql/internal/SQLConf.scala |  3 +-
 5 files changed, 57 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1cafc76e/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 4a3e3d5..8ce9883 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -203,7 +203,8 @@ package object config {
 
   private[spark] val IGNORE_CORRUPT_FILES = 
ConfigBuilder("spark.files.ignoreCorruptFiles")
     .doc("Whether to ignore corrupt files. If true, the Spark jobs will 
continue to run when " +
-      "encountering corrupt files and contents that have been read will still 
be returned.")
+      "encountering corrupted or non-existing files and contents that have 
been read will still " +
+      "be returned.")
     .booleanConf
     .createWithDefault(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1cafc76e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 3133a28..b56ebf4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -210,12 +210,12 @@ class HadoopRDD[K, V](
   override def compute(theSplit: Partition, context: TaskContext): 
InterruptibleIterator[(K, V)] = {
     val iter = new NextIterator[(K, V)] {
 
-      val split = theSplit.asInstanceOf[HadoopPartition]
+      private val split = theSplit.asInstanceOf[HadoopPartition]
       logInfo("Input split: " + split.inputSplit)
-      val jobConf = getJobConf()
+      private val jobConf = getJobConf()
 
-      val inputMetrics = context.taskMetrics().inputMetrics
-      val existingBytesRead = inputMetrics.bytesRead
+      private val inputMetrics = context.taskMetrics().inputMetrics
+      private val existingBytesRead = inputMetrics.bytesRead
 
       // Sets the thread local variable for the file's name
       split.inputSplit.value match {
@@ -225,7 +225,7 @@ class HadoopRDD[K, V](
 
       // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read 
some bytes
-      val getBytesReadCallback: Option[() => Long] = split.inputSplit.value 
match {
+      private val getBytesReadCallback: Option[() => Long] = 
split.inputSplit.value match {
         case _: FileSplit | _: CombineFileSplit =>
           SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
         case _ => None
@@ -235,23 +235,31 @@ class HadoopRDD[K, V](
       // If we do a coalesce, however, we are likely to compute multiple 
partitions in the same
       // task and in the same thread, in which case we need to avoid override 
values written by
       // previous partitions (SPARK-13071).
-      def updateBytesRead(): Unit = {
+      private def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
           inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 
-      var reader: RecordReader[K, V] = null
-      val inputFormat = getInputFormat(jobConf)
+      private var reader: RecordReader[K, V] = null
+      private val inputFormat = getInputFormat(jobConf)
       HadoopRDD.addLocalConfiguration(
         new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
         context.stageId, theSplit.index, context.attemptNumber, jobConf)
-      reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
 
+      reader =
+        try {
+          inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
+        } catch {
+          case e: IOException if ignoreCorruptFiles =>
+            logWarning(s"Skipped the rest content in the corrupted file: 
${split.inputSplit}", e)
+            finished = true
+            null
+        }
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener{ context => closeIfNeeded() }
-      val key: K = reader.createKey()
-      val value: V = reader.createValue()
+      private val key: K = if (reader == null) null.asInstanceOf[K] else 
reader.createKey()
+      private val value: V = if (reader == null) null.asInstanceOf[V] else 
reader.createValue()
 
       override def getNext(): (K, V) = {
         try {

http://git-wip-us.apache.org/repos/asf/spark/blob/1cafc76e/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index c6ddb4b..6168d97 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -132,12 +132,12 @@ class NewHadoopRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext): 
InterruptibleIterator[(K, V)] = {
     val iter = new Iterator[(K, V)] {
-      val split = theSplit.asInstanceOf[NewHadoopPartition]
+      private val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
-      val conf = getConf
+      private val conf = getConf
 
-      val inputMetrics = context.taskMetrics().inputMetrics
-      val existingBytesRead = inputMetrics.bytesRead
+      private val inputMetrics = context.taskMetrics().inputMetrics
+      private val existingBytesRead = inputMetrics.bytesRead
 
       // Sets the thread local variable for the file's name
       split.serializableHadoopSplit.value match {
@@ -147,39 +147,51 @@ class NewHadoopRDD[K, V](
 
       // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
       // creating RecordReader, because RecordReader's constructor might read 
some bytes
-      val getBytesReadCallback: Option[() => Long] = 
split.serializableHadoopSplit.value match {
-        case _: FileSplit | _: CombineFileSplit =>
-          SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
-        case _ => None
-      }
+      private val getBytesReadCallback: Option[() => Long] =
+        split.serializableHadoopSplit.value match {
+          case _: FileSplit | _: CombineFileSplit =>
+            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+          case _ => None
+        }
 
       // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop 
FileSystem statistics.
       // If we do a coalesce, however, we are likely to compute multiple 
partitions in the same
       // task and in the same thread, in which case we need to avoid override 
values written by
       // previous partitions (SPARK-13071).
-      def updateBytesRead(): Unit = {
+      private def updateBytesRead(): Unit = {
         getBytesReadCallback.foreach { getBytesRead =>
           inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
         }
       }
 
-      val format = inputFormatClass.newInstance
+      private val format = inputFormatClass.newInstance
       format match {
         case configurable: Configurable =>
           configurable.setConf(conf)
         case _ =>
       }
-      val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
-      val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-      private var reader = format.createRecordReader(
-        split.serializableHadoopSplit.value, hadoopAttemptContext)
-      reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
+      private val attemptId = new TaskAttemptID(jobTrackerId, id, 
TaskType.MAP, split.index, 0)
+      private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, 
attemptId)
+      private var finished = false
+      private var reader =
+        try {
+          val _reader = format.createRecordReader(
+            split.serializableHadoopSplit.value, hadoopAttemptContext)
+          _reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
+          _reader
+        } catch {
+          case e: IOException if ignoreCorruptFiles =>
+            logWarning(
+              s"Skipped the rest content in the corrupted file: 
${split.serializableHadoopSplit}",
+              e)
+            finished = true
+            null
+        }
 
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener(context => close())
-      var havePair = false
-      var finished = false
-      var recordsSinceMetricsUpdate = 0
+      private var havePair = false
+      private var recordsSinceMetricsUpdate = 0
 
       override def hasNext: Boolean = {
         if (!finished && !havePair) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1cafc76e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 237cdab..69338f7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -150,6 +150,9 @@ class FileScanRDD(
               currentIterator = readFunction(currentFile)
             }
           } catch {
+            case e: IOException if ignoreCorruptFiles =>
+              logWarning(s"Skipped the rest content in the corrupted file: 
$currentFile", e)
+              currentIterator = Iterator.empty
             case e: java.io.FileNotFoundException =>
               throw new java.io.FileNotFoundException(
                 e.getMessage + "\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/1cafc76e/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0280a3b..809b267 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -606,7 +606,8 @@ object SQLConf {
 
   val IGNORE_CORRUPT_FILES = 
SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
     .doc("Whether to ignore corrupt files. If true, the Spark jobs will 
continue to run when " +
-      "encountering corrupt files and contents that have been read will still 
be returned.")
+      "encountering corrupted or non-existing and contents that have been read 
will still be " +
+      "returned.")
     .booleanConf
     .createWithDefault(false)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to