Repository: spark
Updated Branches:
  refs/heads/master 001acc446 -> 76386e1a2


[SPARK-4163][Core][WebUI] Send the fetch failure message back to Web UI

This is a PR to send the fetch failure message back to Web UI.
Before:
![f1](https://cloud.githubusercontent.com/assets/1000778/4856595/1f036c80-60be-11e4-956f-335147fbccb7.png)
![f2](https://cloud.githubusercontent.com/assets/1000778/4856596/1f11cbea-60be-11e4-8fe9-9f9b2b35c884.png)

After (Please ignore the meaning of exception, I threw it in the code directly 
because it's hard to simulate a fetch failure):
![e1](https://cloud.githubusercontent.com/assets/1000778/4856600/2657ea38-60be-11e4-9f2d-d56c5f900f10.png)
![e2](https://cloud.githubusercontent.com/assets/1000778/4856601/26595008-60be-11e4-912b-2744af786991.png)

Author: zsxwing <zsxw...@gmail.com>

Closes #3032 from zsxwing/SPARK-4163 and squashes the following commits:

f7e1faf [zsxwing] Discard changes for FetchFailedException and minor 
modification
4e946f7 [zsxwing] Add e as the cause of SparkException
316767d [zsxwing] Add private[storage] to FetchResult
d51b0b6 [zsxwing] Set e as the cause of FetchFailedException
b88c919 [zsxwing] Use 'private[storage]' for case classes instead of 'sealed'
62103fd [zsxwing] Update as per review
0c07d1f [zsxwing] Backward-compatible support
a3bca65 [zsxwing] Send the fetch failure message back to Web UI


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76386e1a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76386e1a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76386e1a

Branch: refs/heads/master
Commit: 76386e1a23c55a58c0aeea67820aab2bac71b24b
Parents: 001acc4
Author: zsxwing <zsxw...@gmail.com>
Authored: Sun Nov 2 23:20:22 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 2 23:20:22 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala  |  6 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  4 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |  2 +-
 .../spark/shuffle/FetchFailedException.scala    | 16 ++--
 .../shuffle/hash/BlockStoreShuffleFetcher.scala | 14 ++--
 .../storage/ShuffleBlockFetcherIterator.scala   | 82 +++++++++++++-------
 .../org/apache/spark/util/JsonProtocol.scala    |  7 +-
 .../scala/org/apache/spark/util/Utils.scala     |  2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 10 +--
 .../ShuffleBlockFetcherIteratorSuite.scala      |  8 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  2 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  4 +-
 12 files changed, 92 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 202fba6..f45b463 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -69,11 +69,13 @@ case class FetchFailed(
     bmAddress: BlockManagerId,  // Note that bmAddress can be null
     shuffleId: Int,
     mapId: Int,
-    reduceId: Int)
+    reduceId: Int,
+    message: String)
   extends TaskFailedReason {
   override def toErrorString: String = {
     val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
-    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, 
reduceId=$reduceId)"
+    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, 
reduceId=$reduceId, " +
+      s"message=\n$message\n)"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index af17b5d..96114c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1053,7 +1053,7 @@ class DAGScheduler(
         logInfo("Resubmitted " + task + ", so marking it as still running")
         stage.pendingTasks += task
 
-      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) 
=>
         val failedStage = stageIdToStage(task.stageId)
         val mapStage = shuffleToMapStage(shuffleId)
 
@@ -1063,7 +1063,7 @@ class DAGScheduler(
         if (runningStages.contains(failedStage)) {
           logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
             s"due to a fetch failure from $mapStage (${mapStage.name})")
-          markStageAsFinished(failedStage, Some("Fetch failure"))
+          markStageAsFinished(failedStage, Some("Fetch failure: " + 
failureMessage))
           runningStages -= failedStage
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala 
b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 54904bf..4e3d9de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -215,7 +215,7 @@ class JobLogger(val user: String, val logDirName: String) 
extends SparkListener
         taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
                       " STAGE_ID=" + taskEnd.stageId
         stageLogInfo(taskEnd.stageId, taskStatus)
-      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
         taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " 
STAGE_ID=" +
                       taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " 
MAP_ID=" +
                       mapId + " REDUCE_ID=" + reduceId

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index 71c08e9..0c1b6f4 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -19,6 +19,7 @@ package org.apache.spark.shuffle
 
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.{FetchFailed, TaskEndReason}
+import org.apache.spark.util.Utils
 
 /**
  * Failed to fetch a shuffle block. The executor catches this exception and 
propagates it
@@ -30,13 +31,11 @@ private[spark] class FetchFailedException(
     bmAddress: BlockManagerId,
     shuffleId: Int,
     mapId: Int,
-    reduceId: Int)
-  extends Exception {
-
-  override def getMessage: String =
-    "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
+    reduceId: Int,
+    message: String)
+  extends Exception(message) {
 
-  def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, 
mapId, reduceId)
+  def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, 
mapId, reduceId, message)
 }
 
 /**
@@ -46,7 +45,4 @@ private[spark] class MetadataFetchFailedException(
     shuffleId: Int,
     reduceId: Int,
     message: String)
-  extends FetchFailedException(null, shuffleId, -1, reduceId) {
-
-  override def getMessage: String = message
-}
+  extends FetchFailedException(null, shuffleId, -1, reduceId, message)

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index f49917b..0d5247f 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -19,12 +19,13 @@ package org.apache.spark.shuffle.hash
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
 
 import org.apache.spark._
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, 
ShuffleBlockFetcherIterator, ShuffleBlockId}
-import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.{CompletionIterator, Utils}
 
 private[hash] object BlockStoreShuffleFetcher extends Logging {
   def fetch[T](
@@ -52,21 +53,22 @@ private[hash] object BlockStoreShuffleFetcher extends 
Logging {
         (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), 
s._2)))
     }
 
-    def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] 
= {
+    def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
       val blockId = blockPair._1
       val blockOption = blockPair._2
       blockOption match {
-        case Some(block) => {
+        case Success(block) => {
           block.asInstanceOf[Iterator[T]]
         }
-        case None => {
+        case Failure(e) => {
           blockId match {
             case ShuffleBlockId(shufId, mapId, _) =>
               val address = statuses(mapId.toInt)._1
-              throw new FetchFailedException(address, shufId.toInt, 
mapId.toInt, reduceId)
+              throw new FetchFailedException(address, shufId.toInt, 
mapId.toInt, reduceId,
+                Utils.exceptionString(e))
             case _ =>
               throw new SparkException(
-                "Failed to get block " + blockId + ", which is not a shuffle 
block")
+                "Failed to get block " + blockId + ", which is not a shuffle 
block", e)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index ee89c7e..1e57918 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.util.concurrent.LinkedBlockingQueue
 
 import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
+import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.{Logging, TaskContext}
 import org.apache.spark.network.BlockTransferService
@@ -55,7 +56,7 @@ final class ShuffleBlockFetcherIterator(
     blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
     serializer: Serializer,
     maxBytesInFlight: Long)
-  extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
+  extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging {
 
   import ShuffleBlockFetcherIterator._
 
@@ -118,16 +119,18 @@ final class ShuffleBlockFetcherIterator(
   private[this] def cleanup() {
     isZombie = true
     // Release the current buffer if necessary
-    if (currentResult != null && !currentResult.failed) {
-      currentResult.buf.release()
+    currentResult match {
+      case SuccessFetchResult(_, _, buf) => buf.release()
+      case _ =>
     }
 
     // Release buffers in the results queue
     val iter = results.iterator()
     while (iter.hasNext) {
       val result = iter.next()
-      if (!result.failed) {
-        result.buf.release()
+      result match {
+        case SuccessFetchResult(_, _, buf) => buf.release()
+        case _ =>
       }
     }
   }
@@ -151,7 +154,7 @@ final class ShuffleBlockFetcherIterator(
             // Increment the ref count because we need to pass this to a 
different thread.
             // This needs to be released after use.
             buf.retain()
-            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), 
buf))
+            results.put(new SuccessFetchResult(BlockId(blockId), 
sizeMap(blockId), buf))
             shuffleMetrics.remoteBytesRead += buf.size
             shuffleMetrics.remoteBlocksFetched += 1
           }
@@ -160,7 +163,7 @@ final class ShuffleBlockFetcherIterator(
 
         override def onBlockFetchFailure(blockId: String, e: Throwable): Unit 
= {
           logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
-          results.put(new FetchResult(BlockId(blockId), -1, null))
+          results.put(new FailureFetchResult(BlockId(blockId), e))
         }
       }
     )
@@ -231,12 +234,12 @@ final class ShuffleBlockFetcherIterator(
         val buf = blockManager.getBlockData(blockId)
         shuffleMetrics.localBlocksFetched += 1
         buf.retain()
-        results.put(new FetchResult(blockId, 0, buf))
+        results.put(new SuccessFetchResult(blockId, 0, buf))
       } catch {
         case e: Exception =>
           // If we see an exception, stop immediately.
           logError(s"Error occurred while fetching local blocks", e)
-          results.put(new FetchResult(blockId, -1, null))
+          results.put(new FailureFetchResult(blockId, e))
           return
       }
     }
@@ -267,15 +270,17 @@ final class ShuffleBlockFetcherIterator(
 
   override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
 
-  override def next(): (BlockId, Option[Iterator[Any]]) = {
+  override def next(): (BlockId, Try[Iterator[Any]]) = {
     numBlocksProcessed += 1
     val startFetchWait = System.currentTimeMillis()
     currentResult = results.take()
     val result = currentResult
     val stopFetchWait = System.currentTimeMillis()
     shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
-    if (!result.failed) {
-      bytesInFlight -= result.size
+
+    result match {
+      case SuccessFetchResult(_, size, _) => bytesInFlight -= size
+      case _ =>
     }
     // Send fetch requests up to maxBytesInFlight
     while (fetchRequests.nonEmpty &&
@@ -283,20 +288,21 @@ final class ShuffleBlockFetcherIterator(
       sendRequest(fetchRequests.dequeue())
     }
 
-    val iteratorOpt: Option[Iterator[Any]] = if (result.failed) {
-      None
-    } else {
-      val is = blockManager.wrapForCompression(result.blockId, 
result.buf.createInputStream())
-      val iter = serializer.newInstance().deserializeStream(is).asIterator
-      Some(CompletionIterator[Any, Iterator[Any]](iter, {
-        // Once the iterator is exhausted, release the buffer and set 
currentResult to null
-        // so we don't release it again in cleanup.
-        currentResult = null
-        result.buf.release()
-      }))
+    val iteratorTry: Try[Iterator[Any]] = result match {
+      case FailureFetchResult(_, e) => Failure(e)
+      case SuccessFetchResult(blockId, _, buf) => {
+        val is = blockManager.wrapForCompression(blockId, 
buf.createInputStream())
+        val iter = serializer.newInstance().deserializeStream(is).asIterator
+        Success(CompletionIterator[Any, Iterator[Any]](iter, {
+          // Once the iterator is exhausted, release the buffer and set 
currentResult to null
+          // so we don't release it again in cleanup.
+          currentResult = null
+          buf.release()
+        }))
+      }
     }
 
-    (result.blockId, iteratorOpt)
+    (result.blockId, iteratorTry)
   }
 }
 
@@ -315,14 +321,30 @@ object ShuffleBlockFetcherIterator {
   }
 
   /**
-   * Result of a fetch from a remote block. A failure is represented as size 
== -1.
+   * Result of a fetch from a remote block.
+   */
+  private[storage] sealed trait FetchResult {
+    val blockId: BlockId
+  }
+
+  /**
+   * Result of a fetch from a remote block successfully.
    * @param blockId block id
    * @param size estimated size of the block, used to calculate bytesInFlight.
-   *             Note that this is NOT the exact bytes. -1 if failure is 
present.
-   * @param buf [[ManagedBuffer]] for the content. null is error.
+   *             Note that this is NOT the exact bytes.
+   * @param buf [[ManagedBuffer]] for the content.
    */
-  case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) {
-    def failed: Boolean = size == -1
-    if (failed) assert(buf == null) else assert(buf != null)
+  private[storage] case class SuccessFetchResult(blockId: BlockId, size: Long, 
buf: ManagedBuffer)
+    extends FetchResult {
+    require(buf != null)
+    require(size >= 0)
   }
+
+  /**
+   * Result of a fetch from a remote block unsuccessfully.
+   * @param blockId block id
+   * @param e the failure exception
+   */
+  private[storage] case class FailureFetchResult(blockId: BlockId, e: 
Throwable)
+    extends FetchResult
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 43c7fba..f7ae1f7 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -279,7 +279,8 @@ private[spark] object JsonProtocol {
         ("Block Manager Address" -> blockManagerAddress) ~
         ("Shuffle ID" -> fetchFailed.shuffleId) ~
         ("Map ID" -> fetchFailed.mapId) ~
-        ("Reduce ID" -> fetchFailed.reduceId)
+        ("Reduce ID" -> fetchFailed.reduceId) ~
+        ("Message" -> fetchFailed.message)
       case exceptionFailure: ExceptionFailure =>
         val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
         val metrics = 
exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
@@ -629,7 +630,9 @@ private[spark] object JsonProtocol {
         val shuffleId = (json \ "Shuffle ID").extract[Int]
         val mapId = (json \ "Map ID").extract[Int]
         val reduceId = (json \ "Reduce ID").extract[Int]
-        new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
+        val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
+        new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
+          message.getOrElse("Unknown reason"))
       case `exceptionFailure` =>
         val className = (json \ "Class Name").extract[String]
         val description = (json \ "Description").extract[String]

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b402c5f..a33046d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1597,7 +1597,7 @@ private[spark] object Utils extends Logging {
   }
 
   /** Return a nice string representation of the exception, including the 
stack trace. */
-  def exceptionString(e: Exception): String = {
+  def exceptionString(e: Throwable): String = {
     if (e == null) "" else exceptionString(getFormattedClassName(e), 
e.getMessage, e.getStackTrace)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index a2e4f71..819f956 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -431,7 +431,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     // the 2nd ResultTask failed
     complete(taskSets(1), Seq(
         (Success, 42),
-        (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
+        (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), 
null)))
     // this will get called
     // blockManagerMaster.removeExecutor("exec-hostA")
     // ask the scheduler to try it again
@@ -461,7 +461,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     // The first result task fails, with a fetch failure for the output from 
the first mapper.
     runEvent(CompletionEvent(
       taskSets(1).tasks(0),
-      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
       null,
       Map[Long, Any](),
       null,
@@ -472,7 +472,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
     // The second ResultTask fails, with a fetch failure for the output from 
the second mapper.
     runEvent(CompletionEvent(
       taskSets(1).tasks(0),
-      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
       null,
       Map[Long, Any](),
       null,
@@ -624,7 +624,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
         (Success, makeMapStatus("hostC", 1))))
     // fail the third stage because hostA went down
     complete(taskSets(2), Seq(
-        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 
0), null)))
+        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 
0, "ignored"), null)))
     // TODO assert this:
     // blockManagerMaster.removeExecutor("exec-hostA")
     // have DAGScheduler try again
@@ -655,7 +655,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
         (Success, makeMapStatus("hostB", 1))))
     // pretend stage 0 failed because hostA went down
     complete(taskSets(2), Seq(
-        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 
0), null)))
+        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 
0, "ignored"), null)))
     // TODO assert this:
     // blockManagerMaster.removeExecutor("exec-hostA")
     // DAGScheduler should notice the cached copy of the second shuffle and 
try to get it rerun.

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 28f7665..1eaabb9 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -102,7 +102,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     for (i <- 0 until 5) {
       assert(iterator.hasNext, s"iterator should have 5 elements but actually 
has $i elements")
       val (blockId, subIterator) = iterator.next()
-      assert(subIterator.isDefined,
+      assert(subIterator.isSuccess,
         s"iterator should have 5 elements defined but actually has $i 
elements")
 
       // Make sure we release the buffer once the iterator is exhausted.
@@ -230,8 +230,8 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     sem.acquire()
 
     // The first block should be defined, and the last two are not defined 
(due to failure)
-    assert(iterator.next()._2.isDefined === true)
-    assert(iterator.next()._2.isDefined === false)
-    assert(iterator.next()._2.isDefined === false)
+    assert(iterator.next()._2.isSuccess)
+    assert(iterator.next()._2.isFailure)
+    assert(iterator.next()._2.isFailure)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6567c5a..2efbae6 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -115,7 +115,7 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     // Go through all the failure cases to make sure we are counting them as 
failures.
     val taskFailedReasons = Seq(
       Resubmitted,
-      new FetchFailed(null, 0, 0, 0),
+      new FetchFailed(null, 0, 0, 0, "ignored"),
       new ExceptionFailure("Exception", "description", null, None),
       TaskResultLost,
       TaskKilled,

http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d235d7a..a91c9dd 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -107,7 +107,8 @@ class JsonProtocolSuite extends FunSuite {
     testJobResult(jobFailed)
 
     // TaskEndReason
-    val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 
15), 17, 18, 19)
+    val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 
15), 17, 18, 19,
+      "Some exception")
     val exceptionFailure = ExceptionFailure("To be", "or not to be", 
stackTrace, None)
     testTaskEndReason(Success)
     testTaskEndReason(Resubmitted)
@@ -396,6 +397,7 @@ class JsonProtocolSuite extends FunSuite {
         assert(r1.mapId === r2.mapId)
         assert(r1.reduceId === r2.reduceId)
         assertEquals(r1.bmAddress, r2.bmAddress)
+        assert(r1.message === r2.message)
       case (r1: ExceptionFailure, r2: ExceptionFailure) =>
         assert(r1.className === r2.className)
         assert(r1.description === r2.description)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to