Github user yucai commented on a diff in the pull request:
https://github.com/apache/spark/pull/19788#discussion_r152961094
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
// The block is actually going to be a range of a single map output
file for this map, so
// find out the consolidated file, then the offset within that from
our index
+ logDebug(s"Fetch block data for $blockId")
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
+ ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --
Sure, for example, when startPartition = 3, endPartition = 8, it means we
need [3, 8) and length = 5.
Line 204: ByteStreams.skipFully(3 * 8), will skip 0, 1, 2
Line 205: offset = in. readLong, we got startPartition(3)'s offset
Line 206: ByteStreams.skipFully((5 - 1) * 8), will skip 4, 5, 6, 7
Line 207: nextOffset = in.readLong(), now we got endPartition(8)'s offset
When length is "1", zero should be correct. We don't need to skip anything,
and Line 207's readLong will get endPartition's offset.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]