zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r648715501



##########
File path: 
core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
##########
@@ -161,4 +166,78 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
     val resolver = new IndexShuffleBlockResolver(conf, blockManager)
     assert(resolver.getMigrationBlocks(ShuffleBlockInfo(Int.MaxValue, 
Long.MaxValue)).isEmpty)
   }
+
+  test("getMergedBlockData should return expected FileSegmentManagedBuffer 
list") {
+    val shuffleId = 1
+    val reduceId = 1
+    val dataFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.data"
+    val dataFile = new File(tempDir.getAbsolutePath, dataFileName)
+    val out = new FileOutputStream(dataFile)
+    Utils.tryWithSafeFinally {
+      out.write(new Array[Byte](30))
+    } {
+      out.close()
+    }
+    val indexFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.index"
+    prepareMergedShuffleIndexFile(indexFileName)
+    val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+    val dirs = Some(Array[String](tempDir.getAbsolutePath))
+    val managedBufferList =
+      resolver.getMergedBlockData(ShuffleBlockId(shuffleId, -1, reduceId), 
dirs)
+    assert(managedBufferList.size === 3)
+    assert(managedBufferList(0).size === 10)
+    assert(managedBufferList(1).size === 0)
+    assert(managedBufferList(2).size === 20)
+  }
+
+  test("getMergedBlockMeta should return expected MergedBlockMeta") {
+    val shuffleId = 1
+    val reduceId = 1
+    val metaFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.meta"
+    val metaFile = new File(tempDir.getAbsolutePath, metaFileName)
+    val chunkTracker = new RoaringBitmap()
+    chunkTracker.add(1)
+    chunkTracker.add(2)
+    val metaFileOutputStream = new FileOutputStream(metaFile)
+    val outMeta = new DataOutputStream(metaFileOutputStream)
+    Utils.tryWithSafeFinally {
+      chunkTracker.serialize(outMeta)
+      chunkTracker.clear()
+      chunkTracker.add(3)
+      chunkTracker.add(4)
+      chunkTracker.serialize(outMeta)
+      chunkTracker.clear()
+      chunkTracker.add(5)
+      chunkTracker.add(6)
+      chunkTracker.serialize(outMeta)
+    }{
+      outMeta.close()
+    }
+    val indexFileName = s"shuffleMerged_${appId}_${shuffleId}_$reduceId.index"
+    prepareMergedShuffleIndexFile(indexFileName)
+    val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+    val dirs = Some(Array[String](tempDir.getAbsolutePath))
+    val mergedBlockMeta =
+      resolver.getMergedBlockMeta(ShuffleBlockId(shuffleId, -1, reduceId), 
dirs)
+    assert(mergedBlockMeta.getNumChunks === 3)
+    assert(mergedBlockMeta.readChunkBitmaps().size === 3)
+    assert(mergedBlockMeta.readChunkBitmaps()(0).contains(1))
+    assert(mergedBlockMeta.readChunkBitmaps()(0).contains(2))

Review comment:
       Added checks, and also check for invalid mapIds in the bitmap.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to