mridulm commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r632825877
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
verify(wrappedInputStream.invokePrivate(delegateAccess()),
times(1)).close()
}
+ // scalastyle:off argcount
+ private def createShuffleBlockIteratorWithDefaults(
+ blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+ taskContext: Option[TaskContext] = None,
+ streamWrapperLimitSize: Option[Long] = None,
+ blockManager: Option[BlockManager] = None,
+ maxBytesInFlight: Long = Long.MaxValue,
+ maxReqsInFlight: Int = Int.MaxValue,
+ maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+ maxReqSizeShuffleToMem: Int = Int.MaxValue,
+ detectCorrupt: Boolean = true,
+ detectCorruptUseExtraMemory: Boolean = true,
+ shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+ doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+ val tContext = taskContext.getOrElse(TaskContext.empty())
+ new ShuffleBlockFetcherIterator(
+ tContext,
+ transfer,
+ blockManager.getOrElse(createMockBlockManager()),
+ blocksByAddress.map { case (blockManagerId, (blocks, blockSize,
blockMapIndex)) =>
+ (blockManagerId, blocks.map(blockId => (blockId, blockSize,
blockMapIndex)).toSeq)
+ }.toIterator,
+ streamWrapperLimitSize
+ .map(limit => (_: BlockId, in: InputStream) => new
LimitedInputStream(in, limit))
+ .getOrElse((_: BlockId, in: InputStream) => in),
Review comment:
super nit:
```suggestion
(_, in) => new LimitedInputStream(in,
streamWrapperLimitSize.getOrElse(Long.MaxValue))
```
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
verify(wrappedInputStream.invokePrivate(delegateAccess()),
times(1)).close()
}
+ // scalastyle:off argcount
+ private def createShuffleBlockIteratorWithDefaults(
+ blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+ taskContext: Option[TaskContext] = None,
+ streamWrapperLimitSize: Option[Long] = None,
+ blockManager: Option[BlockManager] = None,
+ maxBytesInFlight: Long = Long.MaxValue,
+ maxReqsInFlight: Int = Int.MaxValue,
+ maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+ maxReqSizeShuffleToMem: Int = Int.MaxValue,
+ detectCorrupt: Boolean = true,
+ detectCorruptUseExtraMemory: Boolean = true,
+ shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+ doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+ val tContext = taskContext.getOrElse(TaskContext.empty())
+ new ShuffleBlockFetcherIterator(
+ tContext,
+ transfer,
+ blockManager.getOrElse(createMockBlockManager()),
+ blocksByAddress.map { case (blockManagerId, (blocks, blockSize,
blockMapIndex)) =>
+ (blockManagerId, blocks.map(blockId => (blockId, blockSize,
blockMapIndex)).toSeq)
+ }.toIterator,
Review comment:
We are making the assumption about all blocks are of same size for a
block manager ...
This is currently the way code happens to be written in this suite - but is
not a general expectation.
Something which could require future change to this method : do we want to
make this explicit in the parameter ?
`Map[BlockManagerId, Seq[(BlockId, Long, Int)]` instead - matching what is
in `ShuffleBlockFetcherIterator` - with a util method to convert for current
usage to this form (essentially, pull this conversion to a method and delegate
to that for all current usage) ?
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
verify(wrappedInputStream.invokePrivate(delegateAccess()),
times(1)).close()
}
+ // scalastyle:off argcount
+ private def createShuffleBlockIteratorWithDefaults(
+ blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+ taskContext: Option[TaskContext] = None,
+ streamWrapperLimitSize: Option[Long] = None,
+ blockManager: Option[BlockManager] = None,
+ maxBytesInFlight: Long = Long.MaxValue,
+ maxReqsInFlight: Int = Int.MaxValue,
+ maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+ maxReqSizeShuffleToMem: Int = Int.MaxValue,
+ detectCorrupt: Boolean = true,
+ detectCorruptUseExtraMemory: Boolean = true,
+ shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+ doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+ val tContext = taskContext.getOrElse(TaskContext.empty())
+ new ShuffleBlockFetcherIterator(
+ tContext,
+ transfer,
+ blockManager.getOrElse(createMockBlockManager()),
+ blocksByAddress.map { case (blockManagerId, (blocks, blockSize,
blockMapIndex)) =>
+ (blockManagerId, blocks.map(blockId => (blockId, blockSize,
blockMapIndex)).toSeq)
+ }.toIterator,
+ streamWrapperLimitSize
+ .map(limit => (_: BlockId, in: InputStream) => new
LimitedInputStream(in, limit))
+ .getOrElse((_: BlockId, in: InputStream) => in),
+ maxBytesInFlight,
+ maxReqsInFlight,
+ maxBlocksInFlightPerAddress,
+ maxReqSizeShuffleToMem,
+ detectCorrupt,
+ detectCorruptUseExtraMemory,
+
shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),
Review comment:
nit: `tContext.taskMetrics.createTempShuffleReadMetrics()`
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -703,40 +600,24 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
val sem = new Semaphore(0)
val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"),
0, 100)
- val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer((invocation: InvocationOnMock) => {
- val listener =
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
- Future {
- // Return the first block, and then fail.
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
- sem.release()
- }
- })
-
- val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
- (remoteBmId, blocks.keys.map(blockId => (blockId, 1L,
0)).toSeq)).toIterator
+ answerFetchBlocks { invocation =>
+ val listener = invocation.getArgument[BlockFetchingListener](4)
+ Future {
+ // Return the first block, and then fail.
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
+ sem.release()
+ }
+ }
- val taskContext = TaskContext.empty()
- val iterator = new ShuffleBlockFetcherIterator(
- taskContext,
- transfer,
- blockManager,
- blocksByAddress,
- (_, in) => new LimitedInputStream(in, 100),
- 48 * 1024 * 1024,
- Int.MaxValue,
- Int.MaxValue,
- Int.MaxValue,
- true,
- true,
- taskContext.taskMetrics.createTempShuffleReadMetrics(),
- false)
+ val iterator = createShuffleBlockIteratorWithDefaults(
+ Map(remoteBmId ->(blocks.keys, 1L, 0)),
Review comment:
super nit: space after `->` (here and below)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]