Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/19788#discussion_r152891438
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
// The block is actually going to be a range of a single map output
file for this map, so
// find out the consolidated file, then the offset within that from
our index
+ logDebug(s"Fetch block data for $blockId")
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
+ ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --
I doubt this line is not correct, this seems change the semantics, for
example if startPartition is 3, endPartition is 8, originally it should be
(3\*8), now it changes to (4\*8), can you please explain more?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]