otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r648845832
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -1124,4 +1392,298 @@ object ShuffleBlockFetcherIterator {
*/
private[storage]
case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends
FetchResult
+
+ /**
+ * Result of a fetch from a remote merged block unsuccessfully.
+ * Instead of treating this as a FailureFetchResult, we ignore this failure
+ * and fallback to fetch the original unmerged blocks.
+ * @param blockId block id
+ * @param address BlockManager that the merged block was attempted to be
fetched from
+ * @param size size of the block, used to update bytesInFlight.
+ * @param isNetworkReqDone Is this the last network request for this host in
this fetch
+ * request. Used to update reqsInFlight.
+ */
+ private[storage] case class IgnoreFetchResult(blockId: BlockId,
+ address: BlockManagerId,
+ size: Long,
+ isNetworkReqDone: Boolean) extends FetchResult
+
+ /**
+ * Result of a successful fetch of meta information for a merged block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param blockSize size of each merged block.
+ * @param numChunks number of chunks in the merged block.
+ * @param bitmaps bitmaps for every chunk.
+ * @param address BlockManager that the merged status was fetched
from.
+ */
+ private[storage] case class MergedBlocksMetaFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ blockSize: Long,
+ numChunks: Int,
+ bitmaps: Array[RoaringBitmap],
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+
+ /**
+ * Result of a failure while fetching the meta information for a merged
block.
+ *
+ * @param shuffleId shuffle id.
+ * @param reduceId reduce id.
+ * @param address BlockManager that the merged status was fetched from.
+ */
+ private[storage] case class MergedBlocksMetaFailedFetchResult(
+ shuffleId: Int,
+ reduceId: Int,
+ address: BlockManagerId,
+ blockId: BlockId = DUMMY_SHUFFLE_BLOCK_ID) extends FetchResult
+}
+
+/**
+ * Helper class that encapsulates all the push-based functionality to fetch
merged block meta
+ * and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
Review comment:
A lot of methods in `PushBasedFetchHelper` also needs access to the
iterator instance. It needs to work with the iterator to be able to:
1. add results to the iterator's `result` queue when it receives the meta
response.
2. updates number of blocks to fetch.
3. fetch fallback blocks when there is a fallback and this in turn removes
some pending blocks from `fetchRequests`.
It also needs access to the `shuffleClient`, `blockManager`, and
`mapOutputTracker`. Most of the methods in this class will access one or more
of these instances.
IMO, it seem better to create an instance of `PushBasedFetchHelper` per
iterator instance. Otherwise, all the methods of `PushBasedFetchHelper` will
have way more arguments.
I find this class similar to the existing `BufferReleasingInputStream` in
the iterator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]