Github user juliuszsompolski commented on a diff in the pull request:
https://github.com/apache/spark/pull/20555#discussion_r167971273
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
---
@@ -258,54 +263,43 @@ public int read(byte[] b, int offset, int len) throws
IOException {
if (len == 0) {
return 0;
}
- stateChangeLock.lock();
- try {
- return readInternal(b, offset, len);
- } finally {
- stateChangeLock.unlock();
- }
- }
- /**
- * flip the active and read ahead buffer
- */
- private void swapBuffers() {
- ByteBuffer temp = activeBuffer;
- activeBuffer = readAheadBuffer;
- readAheadBuffer = temp;
- }
-
- /**
- * Internal read function which should be called only from read() api.
The assumption is that
- * the stateChangeLock is already acquired in the caller before calling
this function.
- */
- private int readInternal(byte[] b, int offset, int len) throws
IOException {
- assert (stateChangeLock.isLocked());
if (!activeBuffer.hasRemaining()) {
- waitForAsyncReadComplete();
- if (readAheadBuffer.hasRemaining()) {
- swapBuffers();
- } else {
- // The first read or activeBuffer is skipped.
- readAsync();
+ // No remaining in active buffer - lock and switch to write ahead
buffer.
+ stateChangeLock.lock();
+ try {
waitForAsyncReadComplete();
- if (isEndOfStream()) {
- return -1;
+ if (!readAheadBuffer.hasRemaining()) {
+ // The first read or activeBuffer is skipped.
--- End diff --
skipped using `skip()`.
I moved the comment over from a few lines above, but looking at `skip()`
now I don't think it can happen - the skip would trigger an `readAsync` read in
that case.
I'll update the comment.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]