Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/1578#issuecomment-50232324
  
    Here also should throw an `FetchFailedException `?
    ```scala
        override def next(): (BlockId, Option[Iterator[Any]]) = {
          resultsGotten += 1
          val startFetchWait = System.currentTimeMillis()
          val result = results.take()
          val stopFetchWait = System.currentTimeMillis()
          _fetchWaitTime += (stopFetchWait - startFetchWait)
          if (!result.failed) bytesInFlight -= result.size
          while (!fetchRequests.isEmpty &&
            (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= 
maxBytesInFlight)) {
            sendRequest(fetchRequests.dequeue())
          }
          (result.blockId, if (result.failed) None else 
Some(result.deserialize()))
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to