Github user pathikrit commented on a diff in the pull request:
https://github.com/apache/spark/pull/20179#discussion_r160304869
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
// find out the consolidated file, then the offset within that from
our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
- val in = new DataInputStream(new FileInputStream(indexFile))
+ // SPARK-22982: if this FileInputStream's position is seeked forward
by another piece of code
+ // which is incorrectly using our file descriptor then this code will
fetch the wrong offsets
+ // (which may cause a reducer to be sent a different reducer's data).
The explicit position
+ // checks added here were a useful debugging aid during SPARK-22982
and may help prevent this
+ // class of issue from re-occurring in the future which is why they
are left here even though
+ // SPARK-22982 is fixed.
+ val channel = Files.newByteChannel(indexFile.toPath)
+ channel.position(blockId.reduceId * 8)
+ val in = new DataInputStream(Channels.newInputStream(channel))
try {
- ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
val nextOffset = in.readLong()
+ val actualPosition = channel.position()
+ val expectedPosition = blockId.reduceId * 8 + 16
+ if (actualPosition != expectedPosition) {
--- End diff --
Maybe an assert `assert(actualPosition == expectedPosition, $msg)` is
better for things like this so we may elide them using compiler flags if desired
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]