Github user sitalkedia commented on a diff in the pull request:
https://github.com/apache/spark/pull/18317#discussion_r138502867
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
---
@@ -0,0 +1,313 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead
from the underlying input
+ * stream when specified amount of data has been read from the current
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+ private ReentrantLock stateChangeLock = new ReentrantLock();
+
+ @GuardedBy("stateChangeLock")
+ private ByteBuffer activeBuffer;
+
+ @GuardedBy("stateChangeLock")
+ private ByteBuffer readAheadBuffer;
+
+ @GuardedBy("stateChangeLock")
+ private boolean endOfStream;
+
+ @GuardedBy("stateChangeLock")
+ // true if async read is in progress
+ private boolean readInProgress;
+
+ @GuardedBy("stateChangeLock")
+ // true if read is aborted due to an exception in reading from
underlying input stream.
+ private boolean readAborted;
+
+ @GuardedBy("stateChangeLock")
+ private Exception readException;
+
+ // If the remaining data size in the current buffer is below this
threshold,
+ // we issue an async read from the underlying input stream.
+ private final int readAheadThresholdInBytes;
+
+ private final InputStream underlyingInputStream;
+
+ private final ExecutorService executorService =
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+ private final Condition asyncReadComplete =
stateChangeLock.newCondition();
+
+ private static final ThreadLocal<byte[]> oneByte =
ThreadLocal.withInitial(() -> new byte[1]);
+
+ /**
+ * Creates a <code>ReadAheadInputStream</code> with the specified buffer
size and read-ahead
+ * threshold
+ *
+ * @param inputStream The underlying input stream.
+ * @param bufferSizeInBytes The buffer size.
+ * @param readAheadThresholdInBytes If the active buffer has less
data than the read-ahead
+ * threshold, an async read is
triggered.
+ */
+ public ReadAheadInputStream(InputStream inputStream, int
bufferSizeInBytes, int readAheadThresholdInBytes) {
+ Preconditions.checkArgument(bufferSizeInBytes > 0,
+ "bufferSizeInBytes should be greater than 0, but the value is
" + bufferSizeInBytes);
+ Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+ readAheadThresholdInBytes < bufferSizeInBytes,
+ "readAheadThresholdInBytes should be greater than 0 and less
than bufferSizeInBytes, but the" +
+ "value is " + readAheadThresholdInBytes );
+ activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+ readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+ this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+ this.underlyingInputStream = inputStream;
+ activeBuffer.flip();
+ readAheadBuffer.flip();
+ }
+
+ private boolean isEndOfStream() {
+ return (!activeBuffer.hasRemaining() &&
!readAheadBuffer.hasRemaining() && endOfStream);
+ }
+
+ private void readAsync(final ByteBuffer byteBuffer) throws IOException {
+ stateChangeLock.lock();
+ final byte[] arr = byteBuffer.array();
+ try {
+ if (endOfStream || readInProgress) {
+ return;
+ }
+ byteBuffer.position(0);
+ byteBuffer.flip();
+ readInProgress = true;
+ } finally {
+ stateChangeLock.unlock();
+ }
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ // Please note that it is safe to release the lock and read into
the read ahead buffer
+ // because either of following two conditions will hold - 1. The
active buffer has
+ // data available to read so the reader will not read from the
read ahead buffer.
+ // 2. This is the first time read is called or the active buffer
is exhausted,
+ // in that case the reader waits for this async read to complete.
+ // So there is no race condition in both the situations.
+ boolean handled = false;
+ int read = 0;
+ Exception exception = null;
+ try {
+ while (true) {
+ read = underlyingInputStream.read(arr);
+ if (0 != read) break;
+ }
+ handled = true;
+ } catch (Exception ex) {
+ exception = ex;
+ } finally {
+ stateChangeLock.lock();
+ if (read < 0 || (exception instanceof EOFException) ) {
+ endOfStream = true;
+ } else if (!handled) {
+ readAborted = true;
+ readException = exception != null ? exception: new
Exception("Unknown exception in ReadAheadInputStream");
+ } else {
+ byteBuffer.limit(read);
+ }
+ readInProgress = false;
+ signalAsyncReadComplete();
+ stateChangeLock.unlock();
+ }
+ }
+ });
+ }
+
+
+ private void signalAsyncReadComplete() {
+ stateChangeLock.lock();
+ try {
+ asyncReadComplete.signalAll();
+ } finally {
+ stateChangeLock.unlock();
+ }
+ }
+
+ private void waitForAsyncReadComplete() throws IOException {
+ stateChangeLock.lock();
+ try {
+ while (readInProgress)
+ asyncReadComplete.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.getMessage());
+ } finally {
+ stateChangeLock.unlock();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ int val = read(oneByte.get(), 0, 1);
+ if (val == -1) {
+ return -1;
+ }
+ return oneByte.get()[0] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int offset, int len) throws IOException {
+ if (offset < 0 || len < 0 || offset + len < 0 || offset + len >
b.length) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return 0;
+ }
+ stateChangeLock.lock();
+ try {
+ return readInternal(b, offset, len);
+ } finally {
+ stateChangeLock.unlock();
+ }
+ }
+
+ /**
+ * Internal read function which should be called only from read() api.
The assumption is that
+ * the stateChangeLock is already acquired in the caller before calling
this function.
+ */
+ private int readInternal(byte[] b, int offset, int len) throws
IOException {
+ assert (stateChangeLock.isLocked());
+ if (!activeBuffer.hasRemaining()) {
--- End diff --
Hmm.. not really, because if we `skip(size of active buffer)` then we will
flip the active buffer and read ahead buffer. So the active buffer will
actually have remaining bytes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]