Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18317#discussion_r138198579
  
    --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
    @@ -0,0 +1,315 @@
    +/*
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.io;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.spark.storage.StorageUtils;
    +import org.apache.spark.util.ThreadUtils;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.ByteBuffer;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +/**
    + * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
    + * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
    + * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
    + * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
    + * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
    + * start reading from the read ahead buffer without being blocked in disk 
I/O.
    + */
    +public class ReadAheadInputStream extends InputStream {
    +
    +  private ReentrantLock stateChangeLock = new ReentrantLock();
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer activeBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private ByteBuffer readAheadBuffer;
    +
    +  @GuardedBy("stateChangeLock")
    +  private boolean endOfStream;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if async read is in progress
    +  private boolean readInProgress;
    +
    +  @GuardedBy("stateChangeLock")
    +  // true if read is aborted due to an exception in reading from 
underlying input stream.
    +  private boolean readAborted;
    +
    +  @GuardedBy("stateChangeLock")
    +  private Exception readException;
    +
    +  // If the remaining data size in the current buffer is below this 
threshold,
    +  // we issue an async read from the underlying input stream.
    +  private final int readAheadThresholdInBytes;
    +
    +  private final InputStream underlyingInputStream;
    +
    +  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
    +
    +  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
    +
    +  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
    +
    +  /**
    +   * Creates a <code>ReadAheadInputStream</code> with the specified buffer 
size and read-ahead
    +   * threshold
    +   *
    +   * @param   inputStream                 The underlying input stream.
    +   * @param   bufferSizeInBytes           The buffer size.
    +   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
    +   *                                      threshold, an async read is 
triggered.
    +   */
    +  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
    +    Preconditions.checkArgument(bufferSizeInBytes > 0,
    +            "bufferSizeInBytes should be greater than 0");
    +    Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
    +                    readAheadThresholdInBytes < bufferSizeInBytes,
    +            "readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
    +    activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
    +    this.readAheadThresholdInBytes = readAheadThresholdInBytes;
    +    this.underlyingInputStream = inputStream;
    +    activeBuffer.flip();
    +    readAheadBuffer.flip();
    +  }
    +
    +  private boolean isEndOfStream() {
    +    if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
    +      return true;
    +    }
    +    return  false;
    --- End diff --
    
    why not just `return !activeBuffer.hasRemaining() && 
!readAheadBuffer.hasRemaining() && endOfStream`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to