Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/18317#discussion_r138200201
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead
from the underlying input
+ * stream when specified amount of data has been read from the current
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+ private ReentrantLock stateChangeLock = new ReentrantLock();
+
+ @GuardedBy("stateChangeLock")
+ private ByteBuffer activeBuffer;
+
+ @GuardedBy("stateChangeLock")
+ private ByteBuffer readAheadBuffer;
+
+ @GuardedBy("stateChangeLock")
+ private boolean endOfStream;
+
+ @GuardedBy("stateChangeLock")
+ // true if async read is in progress
+ private boolean readInProgress;
+
+ @GuardedBy("stateChangeLock")
+ // true if read is aborted due to an exception in reading from
underlying input stream.
+ private boolean readAborted;
+
+ @GuardedBy("stateChangeLock")
+ private Exception readException;
+
+ // If the remaining data size in the current buffer is below this
threshold,
+ // we issue an async read from the underlying input stream.
+ private final int readAheadThresholdInBytes;
+
+ private final InputStream underlyingInputStream;
+
+ private final ExecutorService executorService =
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+ private final Condition asyncReadComplete =
stateChangeLock.newCondition();
+
+ private static final ThreadLocal<byte[]> oneByte =
ThreadLocal.withInitial(() -> new byte[1]);
+
+ /**
+ * Creates a <code>ReadAheadInputStream</code> with the specified buffer
size and read-ahead
+ * threshold
+ *
+ * @param inputStream The underlying input stream.
+ * @param bufferSizeInBytes The buffer size.
+ * @param readAheadThresholdInBytes If the active buffer has less
data than the read-ahead
+ * threshold, an async read is
triggered.
+ */
+ public ReadAheadInputStream(InputStream inputStream, int
bufferSizeInBytes, int readAheadThresholdInBytes) {
+ Preconditions.checkArgument(bufferSizeInBytes > 0,
+ "bufferSizeInBytes should be greater than 0");
+ Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+ readAheadThresholdInBytes < bufferSizeInBytes,
+ "readAheadThresholdInBytes should be greater than 0 and less
than bufferSizeInBytes" );
+ activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+ readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+ this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+ this.underlyingInputStream = inputStream;
+ activeBuffer.flip();
+ readAheadBuffer.flip();
+ }
+
+ private boolean isEndOfStream() {
+ if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() &&
endOfStream) {
+ return true;
+ }
+ return false;
+ }
+ private void readAsync(final ByteBuffer byteBuffer) throws IOException {
+ stateChangeLock.lock();
+ if (endOfStream || readInProgress) {
+ stateChangeLock.unlock();
+ return;
+ }
+ byteBuffer.position(0);
+ byteBuffer.flip();
+ readInProgress = true;
+ final byte[] arr = byteBuffer.array();
+ stateChangeLock.unlock();
--- End diff --
nit: put `stateChangeLock.unlock()` in `finally`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]