xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633761349
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
verify(wrappedInputStream.invokePrivate(delegateAccess()),
times(1)).close()
}
+ // scalastyle:off argcount
+ private def createShuffleBlockIteratorWithDefaults(
+ blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+ taskContext: Option[TaskContext] = None,
+ streamWrapperLimitSize: Option[Long] = None,
+ blockManager: Option[BlockManager] = None,
+ maxBytesInFlight: Long = Long.MaxValue,
+ maxReqsInFlight: Int = Int.MaxValue,
+ maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+ maxReqSizeShuffleToMem: Int = Int.MaxValue,
+ detectCorrupt: Boolean = true,
+ detectCorruptUseExtraMemory: Boolean = true,
+ shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+ doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+ val tContext = taskContext.getOrElse(TaskContext.empty())
+ new ShuffleBlockFetcherIterator(
+ tContext,
+ transfer,
+ blockManager.getOrElse(createMockBlockManager()),
+ blocksByAddress.map { case (blockManagerId, (blocks, blockSize,
blockMapIndex)) =>
+ (blockManagerId, blocks.map(blockId => (blockId, blockSize,
blockMapIndex)).toSeq)
+ }.toIterator,
Review comment:
I changed my mind on this; the increase in generality really introduced
very little complexity. It took me less time to make the change than it did to
write my initial response to your comment :)
##########
File path:
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
verify(wrappedInputStream.invokePrivate(delegateAccess()),
times(1)).close()
}
+ // scalastyle:off argcount
+ private def createShuffleBlockIteratorWithDefaults(
+ blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+ taskContext: Option[TaskContext] = None,
+ streamWrapperLimitSize: Option[Long] = None,
+ blockManager: Option[BlockManager] = None,
+ maxBytesInFlight: Long = Long.MaxValue,
+ maxReqsInFlight: Int = Int.MaxValue,
+ maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+ maxReqSizeShuffleToMem: Int = Int.MaxValue,
+ detectCorrupt: Boolean = true,
+ detectCorruptUseExtraMemory: Boolean = true,
+ shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+ doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+ val tContext = taskContext.getOrElse(TaskContext.empty())
+ new ShuffleBlockFetcherIterator(
+ tContext,
+ transfer,
+ blockManager.getOrElse(createMockBlockManager()),
+ blocksByAddress.map { case (blockManagerId, (blocks, blockSize,
blockMapIndex)) =>
+ (blockManagerId, blocks.map(blockId => (blockId, blockSize,
blockMapIndex)).toSeq)
+ }.toIterator,
+ streamWrapperLimitSize
+ .map(limit => (_: BlockId, in: InputStream) => new
LimitedInputStream(in, limit))
+ .getOrElse((_: BlockId, in: InputStream) => in),
Review comment:
Actually unfortunately this doesn't work because some of the tests
assume they have direct access to the input stream buffer to be able to do some
mock verification. So it's better not to wrap.
But I was able to simplify it to:
```
(_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in,
_)).getOrElse(in)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]