Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18317#discussion_r138201264
  
    --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
    @@ -0,0 +1,315 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.io;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.spark.storage.StorageUtils;
    +import org.apache.spark.util.ThreadUtils;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.ByteBuffer;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +/**
    + * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
    + * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
    + * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
    + * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
    + * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
    + * start reading from the read ahead buffer without being blocked in disk 
I/O.
    + */
    +public class ReadAheadInputStream extends InputStream {
    +
    +  private ReentrantLock stateChangeLock = new ReentrantLock();
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer activeBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer readAheadBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private boolean endOfStream;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if async read is in progress
    +  private boolean readInProgress;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if read is aborted due to an exception in reading from 
underlying input stream.
    +  private boolean readAborted;
    +
    +  @GuardedBy("stateChangeLock")
    +  private Exception readException;
    +
    +  // If the remaining data size in the current buffer is below this 
threshold,
    +  // we issue an async read from the underlying input stream.
    +  private final int readAheadThresholdInBytes;
    +
    +  private final InputStream underlyingInputStream;
    +
    +  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
    +
    +  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
    +
    +  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
    +
    +  /**
    +   * Creates a <code>ReadAheadInputStream</code> with the specified buffer 
size and read-ahead
    +   * threshold
    +   *
    +   * @param   inputStream                 The underlying input stream.
    +   * @param   bufferSizeInBytes           The buffer size.
    +   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
    +   *                                      threshold, an async read is 
triggered.
    +   */
    +  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
    +    Preconditions.checkArgument(bufferSizeInBytes > 0,
    +            "bufferSizeInBytes should be greater than 0");
    +    Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
    +                    readAheadThresholdInBytes < bufferSizeInBytes,
    +            "readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
    +    activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    this.readAheadThresholdInBytes = readAheadThresholdInBytes;
    +    this.underlyingInputStream = inputStream;
    +    activeBuffer.flip();
    +    readAheadBuffer.flip();
    +  }
    +
    +  private boolean isEndOfStream() {
    +    if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
    +      return true;
    +    }
    +    return  false;
    +  }
    +  private void readAsync(final ByteBuffer byteBuffer) throws IOException {
    +    stateChangeLock.lock();
    +    if (endOfStream || readInProgress) {
    +      stateChangeLock.unlock();
    +      return;
    +    }
    +    byteBuffer.position(0);
    +    byteBuffer.flip();
    +    readInProgress = true;
    +    final byte[] arr = byteBuffer.array();
    +    stateChangeLock.unlock();
    +    executorService.execute(new Runnable() {
    +      @Override
    +      public void run() {
    +        // Please note that it is safe to release the lock and read into 
the read ahead buffer
    +        // because either of following two conditions will hold - 1. The 
active buffer has
    +        // data available to read so the reader will not read from the 
read ahead buffer.
    +        // 2. This is the first time read is called or the active buffer 
is exhausted,
    +        // in that case the reader waits for this async read to complete.
    +        // So there is no race condition in both the situations.
    +        boolean handled = false;
    +        int read = 0;
    +        Exception exception = new Exception("Unknown exception in 
ReadAheadInputStream");
    +        try {
    +          while (true) {
    +            read = underlyingInputStream.read(arr);
    +            if (0 != read) break;
    +          }
    +          handled = true;
    +        } catch (Exception ex) {
    +          exception = ex;
    +        } finally {
    +          stateChangeLock.lock();
    +          if (read < 0 || (exception instanceof EOFException) ) {
    +            endOfStream = true;
    +          } else if (!handled) {
    +            readAborted = true;
    +            readException = exception != null ? exception: new 
Exception("Unknown exception in ReadAheadInputStream");
    +          } else {
    +            byteBuffer.limit(read);
    +          }
    +          readInProgress = false;
    +          signalAsyncReadComplete();
    +          stateChangeLock.unlock();
    +        }
    +      }
    +    });
    +  }
    +
    +
    +  private void signalAsyncReadComplete() {
    +    stateChangeLock.lock();
    +    try {
    +      asyncReadComplete.signalAll();
    +    } finally {
    +      stateChangeLock.unlock();
    +    }
    +  }
    +
    +  private void waitForAsyncReadComplete() {
    +    stateChangeLock.lock();
    +    try {
    +      if (readInProgress)
    +      asyncReadComplete.await();
    +    } catch (InterruptedException e) {
    --- End diff --
    
    ditto: don't swallow InterruptedException


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to