mridulm commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r632825877



##########
File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, 
blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, 
blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new 
LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       super nit:
   ```suggestion
         (_, in) => new LimitedInputStream(in, 
streamWrapperLimitSize.getOrElse(Long.MaxValue))
   ```

##########
File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, 
blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, 
blockMapIndex)).toSeq)
+      }.toIterator,

Review comment:
       We are making the assumption about all blocks are of same size for a 
block manager ...
   This is currently the way code happens to be written in this suite - but is 
not a general expectation.
   Something which could require future change to this method : do we want to 
make this explicit in the parameter ?
   
   `Map[BlockManagerId, Seq[(BlockId, Long, Int)]` instead - matching what is 
in `ShuffleBlockFetcherIterator` - with a util method to convert for current 
usage to this form (essentially, pull this conversion to a method and delegate 
to that for all current usage) ?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, 
blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, 
blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new 
LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      
shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),

Review comment:
       nit: `tContext.taskMetrics.createTempShuffleReadMetrics()`

##########
File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -703,40 +600,24 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     val sem = new Semaphore(0)
     val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"), 
0, 100)
 
-    val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val listener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        Future {
-          // Return the first block, and then fail.
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
-          sem.release()
-        }
-      })
-
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 
0)).toSeq)).toIterator
+    answerFetchBlocks { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      Future {
+        // Return the first block, and then fail.
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
+        sem.release()
+      }
+    }
 
-    val taskContext = TaskContext.empty()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
-      transfer,
-      blockManager,
-      blocksByAddress,
-      (_, in) => new LimitedInputStream(in, 100),
-      48 * 1024 * 1024,
-      Int.MaxValue,
-      Int.MaxValue,
-      Int.MaxValue,
-      true,
-      true,
-      taskContext.taskMetrics.createTempShuffleReadMetrics(),
-      false)
+    val iterator = createShuffleBlockIteratorWithDefaults(
+      Map(remoteBmId ->(blocks.keys, 1L, 0)),

Review comment:
       super nit: space after `->` (here and below)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to