Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20179#discussion_r160349274
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
// find out the consolidated file, then the offset within that from
our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
- val in = new DataInputStream(new FileInputStream(indexFile))
+ // SPARK-22982: if this FileInputStream's position is seeked forward
by another piece of code
+ // which is incorrectly using our file descriptor then this code will
fetch the wrong offsets
+ // (which may cause a reducer to be sent a different reducer's data).
The explicit position
+ // checks added here were a useful debugging aid during SPARK-22982
and may help prevent this
+ // class of issue from re-occurring in the future which is why they
are left here even though
+ // SPARK-22982 is fixed.
+ val channel = Files.newByteChannel(indexFile.toPath)
+ channel.position(blockId.reduceId * 8)
--- End diff --
It's used to detect bugs like "asynchronous close()" earlier in the future.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]