xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633761349



##########
File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, 
blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, 
blockMapIndex)).toSeq)
+      }.toIterator,

Review comment:
       I changed my mind on this; the increase in generality really introduced 
very little complexity. It took me less time to make the change than it did to 
write my initial response to your comment :)

##########
File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, 
blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, 
blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new 
LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       Actually unfortunately this doesn't work because some of the tests 
assume they have direct access to the input stream buffer to be able to do some 
mock verification. So it's better not to wrap.
   
   But I was able to simplify it to:
   ```
   (_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, 
_)).getOrElse(in)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to