otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r660698644
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1403,67 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of an un-successful fetch of either of these:
+ * 1) Remote shuffle chunk.
+ * 2) Local push-merged block.
+ *
+ * Instead of treating this as a [[FailureFetchResult]], we fallback to
fetch the original blocks.
+ *
+ * @param blockId block id
+ * @param address BlockManager that the push-merged block was attempted to
be fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class FallbackOnPushMergedFailureResult(blockId:
BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a remote push-merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each push-merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the meta was fetched from.
+ */
+ private[storage] case class PushMergedRemoteMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a remote
push-merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the meta was fetched from.
+ */
+ private[storage] case class PushMergedRemoteMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a local push-merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param bitmaps bitmaps for every chunk.
+ * @param localDirs local directories where the push-merged shuffle files
are storedl
+ */
+ private[storage] case class PushMergedLocalMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ bitmaps: Array[RoaringBitmap],
+ localDirs: Array[String],
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
Review comment:
Didn't realize that the `FetchResult` was changed. Earlier it had a
`blockId` field which is why I was using `DUMMY_SHUFFLE_BLOCK_ID`. Will change
it. Thanks for pointing out.
```
private[storage] sealed trait FetchResult {
val blockId: BlockId
val address: BlockManagerId
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]