This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02c9efc  HDDS-1491. Ozone KeyInputStream seek() should not read the 
chunk file. (#795)
02c9efc is described below

commit 02c9efcb8174140c75d26dfd3ee0c280bde58fc8
Author: Hanisha Koneru <koneru.hani...@gmail.com>
AuthorDate: Mon May 13 20:49:52 2019 -0700

    HDDS-1491. Ozone KeyInputStream seek() should not read the chunk file. 
(#795)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 222 +++++++++++++++------
 .../hdds/scm/storage/TestBlockInputStream.java     | 193 ++++++++++++++++++
 .../hadoop/hdds/scm/storage/package-info.java      |  21 ++
 .../hadoop/ozone/client/io/KeyInputStream.java     | 153 +++++++++-----
 .../ozone/client/rpc/TestKeyInputStream.java       | 175 ++++++++++++++++
 .../apache/hadoop/ozone/om/TestChunkStreams.java   |   4 +-
 6 files changed, 655 insertions(+), 113 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 6667163..bb4a5b0 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -61,10 +62,18 @@ public class BlockInputStream extends InputStream 
implements Seekable {
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
   private List<ChunkInfo> chunks;
+  // ChunkIndex points to the index current chunk in the buffers or the the
+  // index of chunk which will be read next into the buffers in
+  // readChunkFromContainer().
   private int chunkIndex;
+  // ChunkIndexOfCurrentBuffer points to the index of chunk read into the
+  // buffers or index of the last chunk in the buffers. It is updated only
+  // when a new chunk is read from container into the buffers.
+  private int chunkIndexOfCurrentBuffer;
   private long[] chunkOffset;
   private List<ByteBuffer> buffers;
   private int bufferIndex;
+  private long bufferPosition;
   private final boolean verifyChecksum;
 
   /**
@@ -76,24 +85,34 @@ public class BlockInputStream extends InputStream 
implements Seekable {
    * @param chunks list of chunks to read
    * @param traceID container protocol call traceID
    * @param verifyChecksum verify checksum
+   * @param initialPosition the initial position of the stream pointer. This
+   *                        position is seeked now if the up-stream was seeked
+   *                        before this was created.
    */
   public BlockInputStream(
       BlockID blockID, XceiverClientManager xceiverClientManager,
       XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
-      boolean verifyChecksum) {
+      boolean verifyChecksum, long initialPosition) throws IOException {
     this.blockID = blockID;
     this.traceID = traceID;
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
     this.chunks = chunks;
-    this.chunkIndex = -1;
+    this.chunkIndex = 0;
+    this.chunkIndexOfCurrentBuffer = -1;
     // chunkOffset[i] stores offset at which chunk i stores data in
     // BlockInputStream
     this.chunkOffset = new long[this.chunks.size()];
     initializeChunkOffset();
     this.buffers = null;
     this.bufferIndex = 0;
+    this.bufferPosition = -1;
     this.verifyChecksum = verifyChecksum;
+    if (initialPosition > 0) {
+      // The stream was seeked to a position before the stream was
+      // initialized. So seeking to the position now.
+      seek(initialPosition);
+    }
   }
 
   private void initializeChunkOffset() {
@@ -176,7 +195,7 @@ public class BlockInputStream extends InputStream 
implements Seekable {
    *
    * @return true if EOF, false if more data is available
    */
-  private boolean blockStreamEOF() {
+  protected boolean blockStreamEOF() {
     if (buffersHaveData() || chunksRemaining()) {
       return false;
     } else {
@@ -223,12 +242,19 @@ public class BlockInputStream extends InputStream 
implements Seekable {
    */
   private synchronized int prepareRead(int len) throws IOException {
     for (;;) {
+      if (!buffersAllocated()) {
+        // The current chunk at chunkIndex has not been read from the
+        // container. Read the chunk and put the data into buffers.
+        readChunkFromContainer();
+      }
       if (buffersHaveData()) {
         // Data is available from buffers
         ByteBuffer bb = buffers.get(bufferIndex);
         return len > bb.remaining() ? bb.remaining() : len;
       } else if (chunksRemaining()) {
         // There are additional chunks available.
+        // Read the next chunk in the block.
+        chunkIndex += 1;
         readChunkFromContainer();
       } else {
         // All available input has been consumed.
@@ -237,26 +263,31 @@ public class BlockInputStream extends InputStream 
implements Seekable {
     }
   }
 
-  private boolean buffersHaveData() {
-    boolean hasData = false;
-
+  private boolean buffersAllocated() {
     if (buffers == null || buffers.isEmpty()) {
       return false;
     }
+    return true;
+  }
 
-    while (bufferIndex < (buffers.size())) {
-      if (buffers.get(bufferIndex).hasRemaining()) {
-        // current buffer has data
-        hasData = true;
-        break;
-      } else {
-        if (buffersRemaining()) {
-          // move to next available buffer
-          ++bufferIndex;
-          Preconditions.checkState(bufferIndex < buffers.size());
-        } else {
-          // no more buffers remaining
+  private boolean buffersHaveData() {
+    boolean hasData = false;
+
+    if (buffersAllocated()) {
+      while (bufferIndex < (buffers.size())) {
+        if (buffers.get(bufferIndex).hasRemaining()) {
+          // current buffer has data
+          hasData = true;
           break;
+        } else {
+          if (buffersRemaining()) {
+            // move to next available buffer
+            ++bufferIndex;
+            Preconditions.checkState(bufferIndex < buffers.size());
+          } else {
+            // no more buffers remaining
+            break;
+          }
         }
       }
     }
@@ -272,7 +303,14 @@ public class BlockInputStream extends InputStream 
implements Seekable {
     if ((chunks == null) || chunks.isEmpty()) {
       return false;
     }
-    return (chunkIndex < (chunks.size() - 1));
+    // Check if more chunks are remaining in the stream after chunkIndex
+    if (chunkIndex < (chunks.size() - 1)) {
+      return true;
+    }
+    // ChunkIndex is the last chunk in the stream. Check if this chunk has
+    // been read from container or not. Return true if chunkIndex has not
+    // been read yet and false otherwise.
+    return chunkIndexOfCurrentBuffer != chunkIndex;
   }
 
   /**
@@ -283,34 +321,14 @@ public class BlockInputStream extends InputStream 
implements Seekable {
    * @throws IOException if there is an I/O error while performing the call
    */
   private synchronized void readChunkFromContainer() throws IOException {
-    // On every chunk read chunkIndex should be increased so as to read the
-    // next chunk
-    chunkIndex += 1;
-    XceiverClientReply reply;
-    ReadChunkResponseProto readChunkResponse = null;
+    // Read the chunk at chunkIndex
     final ChunkInfo chunkInfo = chunks.get(chunkIndex);
     List<DatanodeDetails> excludeDns = null;
     ByteString byteString;
-    List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
+    List<DatanodeDetails> dnList = getDatanodeList();
     while (true) {
-      try {
-        reply = ContainerProtocolCalls
-            .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
-        ContainerProtos.ContainerCommandResponseProto response;
-        response = reply.getResponse().get();
-        ContainerProtocolCalls.validateContainerResponse(response);
-        readChunkResponse = response.getReadChunk();
-      } catch (IOException e) {
-        if (e instanceof StorageContainerException) {
-          throw e;
-        }
-        throw new IOException("Unexpected OzoneException: " + e.toString(), e);
-      } catch (ExecutionException | InterruptedException e) {
-        throw new IOException(
-            "Failed to execute ReadChunk command for chunk  " + chunkInfo
-                .getChunkName(), e);
-      }
-      byteString = readChunkResponse.getData();
+      List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
+      byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
       try {
         if (byteString.size() != chunkInfo.getLen()) {
           // Bytes read from chunk should be equal to chunk size.
@@ -333,7 +351,7 @@ public class BlockInputStream extends InputStream 
implements Seekable {
         if (excludeDns == null) {
           excludeDns = new ArrayList<>();
         }
-        excludeDns.addAll(reply.getDatanodes());
+        excludeDns.addAll(dnListFromReadChunkCall);
         if (excludeDns.size() == dnList.size()) {
           throw ioe;
         }
@@ -342,6 +360,47 @@ public class BlockInputStream extends InputStream 
implements Seekable {
 
     buffers = byteString.asReadOnlyByteBufferList();
     bufferIndex = 0;
+    chunkIndexOfCurrentBuffer = chunkIndex;
+
+    // The bufferIndex and position might need to be adjusted if seek() was
+    // called on the stream before. This needs to be done so that the buffer
+    // position can be advanced to the 'seeked' position.
+    adjustBufferIndex();
+  }
+
+  /**
+   * Send RPC call to get the chunk from the container.
+   */
+  @VisibleForTesting
+  protected ByteString readChunk(final ChunkInfo chunkInfo,
+      List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
+      throws IOException {
+    XceiverClientReply reply;
+    ReadChunkResponseProto readChunkResponse = null;
+    try {
+      reply = ContainerProtocolCalls
+          .readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
+      ContainerProtos.ContainerCommandResponseProto response;
+      response = reply.getResponse().get();
+      ContainerProtocolCalls.validateContainerResponse(response);
+      readChunkResponse = response.getReadChunk();
+      dnListFromReply.addAll(reply.getDatanodes());
+    } catch (IOException e) {
+      if (e instanceof StorageContainerException) {
+        throw e;
+      }
+      throw new IOException("Unexpected OzoneException: " + e.toString(), e);
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IOException(
+          "Failed to execute ReadChunk command for chunk  " + chunkInfo
+              .getChunkName(), e);
+    }
+    return readChunkResponse.getData();
+  }
+
+  @VisibleForTesting
+  protected List<DatanodeDetails> getDatanodeList() {
+    return xceiverClient.getPipeline().getNodes();
   }
 
   @Override
@@ -352,9 +411,8 @@ public class BlockInputStream extends InputStream 
implements Seekable {
       throw new EOFException("EOF encountered pos: " + pos + " container key: "
           + blockID.getLocalID());
     }
-    if (chunkIndex == -1) {
-      chunkIndex = Arrays.binarySearch(chunkOffset, pos);
-    } else if (pos < chunkOffset[chunkIndex]) {
+
+    if (pos < chunkOffset[chunkIndex]) {
       chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
     } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
         .getLen()) {
@@ -368,40 +426,71 @@ public class BlockInputStream extends InputStream 
implements Seekable {
       // accordingly so that chunkIndex = insertionPoint - 1
       chunkIndex = -chunkIndex -2;
     }
-    // adjust chunkIndex so that readChunkFromContainer reads the correct chunk
-    chunkIndex -= 1;
-    readChunkFromContainer();
-    adjustBufferIndex(pos);
+
+    // The bufferPosition should be adjusted to account for the chunk offset
+    // of the chunk the the pos actually points to.
+    bufferPosition = pos - chunkOffset[chunkIndex];
+
+    // Check if current buffers correspond to the chunk index being seeked
+    // and if the buffers have any data.
+    if (chunkIndex == chunkIndexOfCurrentBuffer && buffersAllocated()) {
+      // Position the buffer to the seeked position.
+      adjustBufferIndex();
+    } else {
+      // Release the current buffers. The next readChunkFromContainer will
+      // read the required chunk and position the buffer to the seeked
+      // position.
+      releaseBuffers();
+    }
   }
 
-  private void adjustBufferIndex(long pos) {
-    long tempOffest = chunkOffset[chunkIndex];
+  private void adjustBufferIndex() {
+    if (bufferPosition == -1) {
+      // The stream has not been seeked to a position. No need to adjust the
+      // buffer Index and position.
+      return;
+    }
+    // The bufferPosition is w.r.t the buffers for current chunk.
+    // Adjust the bufferIndex and position to the seeked position.
+    long tempOffest = 0;
     for (int i = 0; i < buffers.size(); i++) {
-      if (pos - tempOffest >= buffers.get(i).capacity()) {
+      if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
         tempOffest += buffers.get(i).capacity();
       } else {
         bufferIndex = i;
         break;
       }
     }
-    buffers.get(bufferIndex).position((int) (pos - tempOffest));
+    buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
+    // Reset the bufferPosition as the seek() operation has been completed.
+    bufferPosition = -1;
   }
 
   @Override
   public synchronized long getPos() throws IOException {
-    if (chunkIndex == -1) {
-      // no data consumed yet, a new stream OR after seek
-      return 0;
-    }
-
-    if (blockStreamEOF()) {
+    // position = chunkOffset of current chunk (at chunkIndex) + position of
+    // the buffer corresponding to the chunk.
+    long bufferPos = 0;
+
+    if (bufferPosition >= 0) {
+      // seek has been called but the buffers were empty. Hence, the buffer
+      // position will be advanced after the buffers are filled.
+      // We return the chunkOffset + bufferPosition here as that will be the
+      // position of the buffer pointer after reading the chunk file.
+      bufferPos = bufferPosition;
+
+    } else if (blockStreamEOF()) {
       // all data consumed, buffers have been released.
       // get position from the chunk offset and chunk length of last chunk
-      return chunkOffset[chunkIndex] + chunks.get(chunkIndex).getLen();
+      bufferPos = chunks.get(chunkIndex).getLen();
+
+    } else if (buffersAllocated()) {
+      // get position from available buffers of current chunk
+      bufferPos = buffers.get(bufferIndex).position();
+
     }
 
-    // get position from available buffers of current chunk
-    return chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
+    return chunkOffset[chunkIndex] + bufferPos;
   }
 
   @Override
@@ -412,4 +501,9 @@ public class BlockInputStream extends InputStream 
implements Seekable {
   public BlockID getBlockID() {
     return blockID;
   }
+
+  @VisibleForTesting
+  protected int getChunkIndex() {
+    return chunkIndex;
+  }
 }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
new file mode 100644
index 0000000..35c1022
--- /dev/null
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests {@link BlockInputStream}.
+ */
+public class TestBlockInputStream {
+
+  private static BlockInputStream blockInputStream;
+  private static List<ChunkInfo> chunks;
+  private static int blockSize;
+
+  private static final int CHUNK_SIZE = 20;
+
+  @Before
+  public void setup() throws Exception {
+    BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+    chunks = createChunkList(10);
+    String traceID = UUID.randomUUID().toString();
+    blockInputStream = new DummyBlockInputStream(blockID, null, null, chunks,
+        traceID, false, 0);
+
+    blockSize = 0;
+    for (ChunkInfo chunk : chunks) {
+      blockSize += chunk.getLen();
+    }
+  }
+
+  /**
+   * Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
+   * and the last chunk with length CHUNK_SIZE/2.
+   * @param numChunks
+   * @return
+   */
+  private static List<ChunkInfo> createChunkList(int numChunks) {
+    ChecksumData dummyChecksumData = ChecksumData.newBuilder()
+        .setType(ChecksumType.NONE)
+        .setBytesPerChecksum(100)
+        .build();
+    List<ChunkInfo> chunkList = new ArrayList<>(numChunks);
+    int i;
+    for (i = 0; i < numChunks - 1; i++) {
+      String chunkName = "chunk-" + i;
+      ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+          .setChunkName(chunkName)
+          .setOffset(0)
+          .setLen(CHUNK_SIZE)
+          .setChecksumData(dummyChecksumData)
+          .build();
+      chunkList.add(chunkInfo);
+    }
+    ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+        .setChunkName("chunk-" + i)
+        .setOffset(0)
+        .setLen(CHUNK_SIZE/2)
+        .setChecksumData(dummyChecksumData)
+        .build();
+    chunkList.add(chunkInfo);
+
+    return chunkList;
+  }
+
+  /**
+   * A dummy BlockInputStream to test the functionality of BlockInputStream.
+   */
+  private static class DummyBlockInputStream extends BlockInputStream {
+
+    DummyBlockInputStream(BlockID blockID,
+        XceiverClientManager xceiverClientManager,
+        XceiverClientSpi xceiverClient,
+        List<ChunkInfo> chunks,
+        String traceID,
+        boolean verifyChecksum,
+        long initialPosition) throws IOException {
+      super(blockID, xceiverClientManager, xceiverClient, chunks, traceID,
+          verifyChecksum, initialPosition);
+    }
+
+    @Override
+    protected ByteString readChunk(final ChunkInfo chunkInfo,
+        List<DatanodeDetails> excludeDns, List<DatanodeDetails> 
dnListFromReply)
+        throws IOException {
+      return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
+    }
+
+    @Override
+    protected List<DatanodeDetails> getDatanodeList() {
+      // return an empty dummy list of size 10
+      return new ArrayList<>(10);
+    }
+
+    /**
+     * Create ByteString with the input data to return when a readChunk call is
+     * placed.
+     */
+    private static ByteString getByteString(String data, int length) {
+      while (data.length() < length) {
+        data = data + "0";
+      }
+      return ByteString.copyFrom(data.getBytes(), 0, length);
+    }
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    // Seek to position 0
+    int pos = 0;
+    seekAndVerify(pos);
+    Assert.assertEquals("ChunkIndex is incorrect", 0,
+        blockInputStream.getChunkIndex());
+
+    pos = CHUNK_SIZE;
+    seekAndVerify(pos);
+    Assert.assertEquals("ChunkIndex is incorrect", 1,
+        blockInputStream.getChunkIndex());
+
+    pos = (CHUNK_SIZE * 5) + 5;
+    seekAndVerify(pos);
+    Assert.assertEquals("ChunkIndex is incorrect", 5,
+        blockInputStream.getChunkIndex());
+
+    try {
+      // Try seeking beyond the blockSize.
+      pos = blockSize + 10;
+      seekAndVerify(pos);
+      Assert.fail("Seek to position beyond block size should fail.");
+    } catch (EOFException e) {
+      // Expected
+    }
+
+    // Seek to random positions between 0 and the block size.
+    Random random = new Random();
+    for (int i = 0; i < 10; i++) {
+      pos = random.nextInt(blockSize);
+      seekAndVerify(pos);
+    }
+  }
+
+  @Test
+  public void testBlockEOF() throws Exception {
+    // Seek to some position < blockSize and verify EOF is not reached.
+    seekAndVerify(CHUNK_SIZE);
+    Assert.assertFalse(blockInputStream.blockStreamEOF());
+
+    // Seek to blockSize-1 and verify that EOF is not reached as the chunk
+    // has not been read from container yet.
+    seekAndVerify(blockSize-1);
+    Assert.assertFalse(blockInputStream.blockStreamEOF());
+  }
+
+  private void seekAndVerify(int pos) throws Exception {
+    blockInputStream.seek(pos);
+    Assert.assertEquals("Current position of buffer does not match with the " +
+            "seeked position", pos, blockInputStream.getPos());
+  }
+}
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java
new file mode 100644
index 0000000..abdd04e
--- /dev/null
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/**
+ * This package contains Ozone InputStream related tests.
+ */
+package org.apache.hadoop.hdds.scm.storage;
\ No newline at end of file
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 3a92e01..5b63420 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -84,11 +84,28 @@ public class KeyInputStream extends InputStream implements 
Seekable {
    * @param streamLength the max number of bytes that should be written to this
    *                     stream.
    */
+  @VisibleForTesting
   public synchronized void addStream(BlockInputStream stream,
       long streamLength) {
     streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
   }
 
+  /**
+   * Append another ChunkInputStreamEntry to the end of the list.
+   * The stream will be constructed from the input information when it needs
+   * to be accessed.
+   */
+  private synchronized void addStream(OmKeyLocationInfo omKeyLocationInfo,
+      XceiverClientManager xceiverClientMngr, String clientRequestId,
+      boolean verifyChecksum) {
+    streamEntries.add(new ChunkInputStreamEntry(omKeyLocationInfo,
+        xceiverClientMngr, clientRequestId, verifyChecksum));
+  }
+
+  private synchronized ChunkInputStreamEntry getStreamEntry(int index)
+      throws IOException {
+    return streamEntries.get(index).getStream();
+  }
 
   @Override
   public synchronized int read() throws IOException {
@@ -120,7 +137,7 @@ public class KeyInputStream extends InputStream implements 
Seekable {
                               .getRemaining() == 0)) {
         return totalReadLen == 0 ? EOF : totalReadLen;
       }
-      ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
+      ChunkInputStreamEntry current = getStreamEntry(currentStreamIndex);
       int numBytesToRead = Math.min(len, (int)current.getRemaining());
       int numBytesRead = current.read(b, off, numBytesToRead);
       if (numBytesRead != numBytesToRead) {
@@ -212,13 +229,81 @@ public class KeyInputStream extends InputStream 
implements Seekable {
   public static class ChunkInputStreamEntry extends InputStream
       implements Seekable {
 
-    private final BlockInputStream blockInputStream;
+    private BlockInputStream blockInputStream;
+    private final OmKeyLocationInfo blockLocationInfo;
     private final long length;
+    private final XceiverClientManager xceiverClientManager;
+    private final String requestId;
+    private boolean verifyChecksum;
+
+    // the position of the blockInputStream is maintained by this variable
+    // till the stream is initialized
+    private long position;
+
+    public ChunkInputStreamEntry(OmKeyLocationInfo omKeyLocationInfo,
+        XceiverClientManager xceiverClientMngr, String clientRequestId,
+        boolean verifyChecksum) {
+      this.blockLocationInfo = omKeyLocationInfo;
+      this.length = omKeyLocationInfo.getLength();
+      this.xceiverClientManager = xceiverClientMngr;
+      this.requestId = clientRequestId;
+      this.verifyChecksum = verifyChecksum;
+    }
 
+    @VisibleForTesting
     public ChunkInputStreamEntry(BlockInputStream blockInputStream,
         long length) {
       this.blockInputStream = blockInputStream;
       this.length = length;
+      this.blockLocationInfo = null;
+      this.xceiverClientManager = null;
+      this.requestId = null;
+    }
+
+    private ChunkInputStreamEntry getStream() throws IOException {
+      if (this.blockInputStream == null) {
+        initializeBlockInputStream();
+      }
+      return this;
+    }
+
+    private void initializeBlockInputStream() throws IOException {
+      BlockID blockID = blockLocationInfo.getBlockID();
+      long containerID = blockID.getContainerID();
+      Pipeline pipeline = blockLocationInfo.getPipeline();
+
+      // irrespective of the container state, we will always read via 
Standalone
+      // protocol.
+      if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+        pipeline = Pipeline.newBuilder(pipeline)
+            .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
+      }
+      XceiverClientSpi xceiverClient = xceiverClientManager
+          .acquireClient(pipeline);
+      boolean success = false;
+      long containerKey = blockLocationInfo.getLocalID();
+      try {
+        LOG.debug("Initializing stream for get key to access {} {}",
+            containerID, containerKey);
+        ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
+            .getDatanodeBlockIDProtobuf();
+        if (blockLocationInfo.getToken() != null) {
+          UserGroupInformation.getCurrentUser().
+              addToken(blockLocationInfo.getToken());
+        }
+        ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
+            .getBlock(xceiverClient, datanodeBlockID, requestId);
+        List<ContainerProtos.ChunkInfo> chunks =
+            response.getBlockData().getChunksList();
+        success = true;
+        this.blockInputStream = new BlockInputStream(
+            blockLocationInfo.getBlockID(), xceiverClientManager, 
xceiverClient,
+            chunks, requestId, verifyChecksum, position);
+      } finally {
+        if (!success) {
+          xceiverClientManager.releaseClient(xceiverClient, false);
+        }
+      }
     }
 
     synchronized long getRemaining() throws IOException {
@@ -240,17 +325,27 @@ public class KeyInputStream extends InputStream 
implements Seekable {
 
     @Override
     public synchronized void close() throws IOException {
-      blockInputStream.close();
+      if (blockInputStream != null) {
+        blockInputStream.close();
+      }
     }
 
     @Override
     public void seek(long pos) throws IOException {
-      blockInputStream.seek(pos);
+      if (blockInputStream != null) {
+        blockInputStream.seek(pos);
+      } else {
+        position = pos;
+      }
     }
 
     @Override
     public long getPos() throws IOException {
-      return blockInputStream.getPos();
+      if (blockInputStream != null) {
+        return blockInputStream.getPos();
+      } else {
+        return position;
+      }
     }
 
     @Override
@@ -266,7 +361,6 @@ public class KeyInputStream extends InputStream implements 
Seekable {
           storageContainerLocationClient,
       String requestId, boolean verifyChecksum) throws IOException {
     long length = 0;
-    long containerKey;
     KeyInputStream groupInputStream = new KeyInputStream();
     groupInputStream.key = keyInfo.getKeyName();
     List<OmKeyLocationInfo> keyLocationInfos =
@@ -274,48 +368,13 @@ public class KeyInputStream extends InputStream 
implements Seekable {
     groupInputStream.streamOffset = new long[keyLocationInfos.size()];
     for (int i = 0; i < keyLocationInfos.size(); i++) {
       OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
-      BlockID blockID = omKeyLocationInfo.getBlockID();
-      long containerID = blockID.getContainerID();
-      Pipeline pipeline = omKeyLocationInfo.getPipeline();
+      LOG.debug("Adding stream for accessing {}. The stream will be " +
+          "initialized later.", omKeyLocationInfo);
+      groupInputStream.addStream(omKeyLocationInfo, xceiverClientManager,
+          requestId, verifyChecksum);
 
-      // irrespective of the container state, we will always read via 
Standalone
-      // protocol.
-      if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
-        pipeline = Pipeline.newBuilder(pipeline)
-            .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
-      }
-      XceiverClientSpi xceiverClient = xceiverClientManager
-          .acquireClient(pipeline);
-      boolean success = false;
-      containerKey = omKeyLocationInfo.getLocalID();
-      try {
-        LOG.debug("get key accessing {} {}",
-            containerID, containerKey);
-        groupInputStream.streamOffset[i] = length;
-        ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
-            .getDatanodeBlockIDProtobuf();
-        if (omKeyLocationInfo.getToken() != null) {
-          UserGroupInformation.getCurrentUser().
-              addToken(omKeyLocationInfo.getToken());
-        }
-        ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
-            .getBlock(xceiverClient, datanodeBlockID, requestId);
-        List<ContainerProtos.ChunkInfo> chunks =
-            response.getBlockData().getChunksList();
-        for (ContainerProtos.ChunkInfo chunk : chunks) {
-          length += chunk.getLen();
-        }
-        success = true;
-        BlockInputStream inputStream = new BlockInputStream(
-            omKeyLocationInfo.getBlockID(), xceiverClientManager, 
xceiverClient,
-            chunks, requestId, verifyChecksum);
-        groupInputStream.addStream(inputStream,
-            omKeyLocationInfo.getLength());
-      } finally {
-        if (!success) {
-          xceiverClientManager.releaseClient(xceiverClient, false);
-        }
-      }
+      groupInputStream.streamOffset[i] = length;
+      length += omKeyLocationInfo.getLength();
     }
     groupInputStream.length = length;
     return new LengthInputStream(groupInputStream, length);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
new file mode 100644
index 0000000..fa8a289
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests {@link KeyInputStream}.
+ */
+public class TestKeyInputStream {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf = new OzoneConfiguration();
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int flushSize;
+  private static int maxFlushSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    chunkSize = 100;
+    flushSize = 4 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, 
TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+        StorageUnit.MB);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "test-key-input-stream-volume";
+    bucketName = "test-key-input-stream-bucket";
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    XceiverClientMetrics metrics = XceiverClientManager
+        .getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long readChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.ReadChunk);
+
+    String keyName = getKeyName();
+    OzoneOutputStream key = ContainerTestHelper.createKey(keyName,
+        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+    // write data spanning 3 chunks
+    int dataLength = (2 * chunkSize) + (chunkSize / 2);
+    byte[] inputData = ContainerTestHelper.getFixedLengthString(
+        keyString, dataLength).getBytes(UTF_8);
+    key.write(inputData);
+    key.close();
+
+    Assert.assertEquals(writeChunkCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+
+    KeyInputStream keyInputStream = (KeyInputStream) objectStore
+        .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+        .getInputStream();
+
+    // Seek to position 150
+    keyInputStream.seek(150);
+
+    Assert.assertEquals(150, keyInputStream.getPos());
+
+    // Seek operation should not result in any readChunk operation.
+    Assert.assertEquals(readChunkCount, metrics
+        .getContainerOpsMetrics(ContainerProtos.Type.ReadChunk));
+    Assert.assertEquals(readChunkCount, metrics
+        .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+    byte[] readData = new byte[chunkSize];
+    keyInputStream.read(readData, 0, chunkSize);
+
+    // Since we reading data from index 150 to 250 and the chunk boundary is
+    // 100 bytes, we need to read 2 chunks.
+    Assert.assertEquals(readChunkCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+    keyInputStream.close();
+
+    // Verify that the data read matches with the input data at corresponding
+    // indices.
+    for (int i = 0; i < chunkSize; i++) {
+      Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index e4e449b..45f04df 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -49,7 +49,7 @@ public class TestChunkStreams {
         int tempOffset = offset;
         BlockInputStream in =
             new BlockInputStream(null, null, null, new ArrayList<>(), null,
-                true) {
+                true, 0) {
               private long pos = 0;
               private ByteArrayInputStream in =
                   new ByteArrayInputStream(buf, tempOffset, 100);
@@ -106,7 +106,7 @@ public class TestChunkStreams {
         int tempOffset = offset;
         BlockInputStream in =
             new BlockInputStream(null, null, null, new ArrayList<>(), null,
-                true) {
+                true, 0) {
               private long pos = 0;
               private ByteArrayInputStream in =
                   new ByteArrayInputStream(buf, tempOffset, 100);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to