http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
new file mode 100644
index 0000000..5e7cb9b
--- /dev/null
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdds.client.BlockID;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.om.helpers.*;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdds.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.ListIterator;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Maintaining a list of ChunkInputStream. Write based on offset.
+ *
+ * Note that this may write to multiple containers in one write call. In case
+ * that first container succeeded but later ones failed, the succeeded writes
+ * are not rolled back.
+ *
+ * TODO : currently not support multi-thread access.
+ */
+public class KeyOutputStream extends OutputStream {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyOutputStream.class);
+
+  // array list's get(index) is O(1)
+  private final ArrayList<BlockOutputStreamEntry> streamEntries;
+  private int currentStreamIndex;
+  private final OzoneManagerProtocolClientSideTranslatorPB omClient;
+  private final
+      StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
+  private final OmKeyArgs keyArgs;
+  private final long openID;
+  private final XceiverClientManager xceiverClientManager;
+  private final int chunkSize;
+  private final String requestID;
+  private boolean closed;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private final long blockSize;
+  private final Checksum checksum;
+  private List<ByteBuffer> bufferList;
+  private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
+  /**
+   * A constructor for testing purpose only.
+   */
+  @VisibleForTesting
+  public KeyOutputStream() {
+    streamEntries = new ArrayList<>();
+    omClient = null;
+    scmClient = null;
+    keyArgs = null;
+    openID = -1;
+    xceiverClientManager = null;
+    chunkSize = 0;
+    requestID = null;
+    closed = false;
+    streamBufferFlushSize = 0;
+    streamBufferMaxSize = 0;
+    bufferList = new ArrayList<>(1);
+    ByteBuffer buffer = ByteBuffer.allocate(1);
+    bufferList.add(buffer);
+    watchTimeout = 0;
+    blockSize = 0;
+    this.checksum = new Checksum();
+  }
+
+  /**
+   * For testing purpose only. Not building output stream from blocks, but
+   * taking from externally.
+   *
+   * @param outputStream
+   * @param length
+   */
+  @VisibleForTesting
+  public void addStream(OutputStream outputStream, long length) {
+    streamEntries.add(
+        new BlockOutputStreamEntry(outputStream, length, checksum));
+  }
+
+  @VisibleForTesting
+  public List<BlockOutputStreamEntry> getStreamEntries() {
+    return streamEntries;
+  }
+  @VisibleForTesting
+  public XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    for (BlockOutputStreamEntry streamEntry : streamEntries) {
+      OmKeyLocationInfo info =
+          new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
+              .setLength(streamEntry.currentPosition).setOffset(0)
+              .build();
+      LOG.debug("block written " + streamEntry.blockID + ", length "
+          + streamEntry.currentPosition + " bcsID " + streamEntry.blockID
+          .getBlockCommitSequenceId());
+      locationInfoList.add(info);
+    }
+    return locationInfoList;
+  }
+
+  public KeyOutputStream(OpenKeySession handler,
+      XceiverClientManager xceiverClientManager,
+      StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
+      OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
+      String requestId, ReplicationFactor factor, ReplicationType type,
+      long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
+      Checksum checksum, String uploadID, int partNumber, boolean isMultipart) 
{
+    this.streamEntries = new ArrayList<>();
+    this.currentStreamIndex = 0;
+    this.omClient = omClient;
+    this.scmClient = scmClient;
+    OmKeyInfo info = handler.getKeyInfo();
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setType(type).setFactor(factor).setDataSize(info.getDataSize())
+        .setIsMultipartKey(isMultipart).setMultipartUploadID(
+            uploadID).setMultipartUploadPartNumber(partNumber)
+        .build();
+    this.openID = handler.getId();
+    this.xceiverClientManager = xceiverClientManager;
+    this.chunkSize = chunkSize;
+    this.requestID = requestId;
+    this.streamBufferFlushSize = bufferFlushSize;
+    this.streamBufferMaxSize = bufferMaxSize;
+    this.blockSize = size;
+    this.watchTimeout = watchTimeout;
+    this.checksum = checksum;
+
+    Preconditions.checkState(chunkSize > 0);
+    Preconditions.checkState(streamBufferFlushSize > 0);
+    Preconditions.checkState(streamBufferMaxSize > 0);
+    Preconditions.checkState(blockSize > 0);
+    Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
+    Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
+    Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
+    this.bufferList = new ArrayList<>();
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's 
version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be 
picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream 
entries.
+   *
+   * @param version the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    // server may return any number of blocks, (0 to any)
+    // only the blocks allocated in this open session (block createVersion
+    // equals to open session version)
+    for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) {
+      if (subKeyInfo.getCreateVersion() == openVersion) {
+        addKeyLocationInfo(subKeyInfo);
+      }
+    }
+  }
+
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
+      throws IOException {
+    ContainerWithPipeline containerWithPipeline = scmClient
+        .getContainerWithPipeline(subKeyInfo.getContainerID());
+    XceiverClientSpi xceiverClient =
+        
xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
+    streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(),
+        keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
+        chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
+        streamBufferMaxSize, watchTimeout, bufferList, checksum));
+  }
+
+
+  @Override
+  public void write(int b) throws IOException {
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to streams.
+   *
+   * NOTE: Throws exception if the data could not fit into the remaining space.
+   * In which case nothing will be written.
+   * TODO:May need to revisit this behaviour.
+   *
+   * @param b byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len)
+      throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+    handleWrite(b, off, len, false);
+  }
+
+  private long computeBufferData() {
+    return bufferList.stream().mapToInt(value -> value.position())
+        .sum();
+  }
+
+  private void handleWrite(byte[] b, int off, long len, boolean retry)
+      throws IOException {
+    int succeededAllocates = 0;
+    while (len > 0) {
+      if (streamEntries.size() <= currentStreamIndex) {
+        Preconditions.checkNotNull(omClient);
+        // allocate a new block, if a exception happens, log an error and
+        // throw exception to the caller directly, and the write fails.
+        try {
+          allocateNewBlock(currentStreamIndex);
+          succeededAllocates += 1;
+        } catch (IOException ioe) {
+          LOG.error("Try to allocate more blocks for write failed, already "
+              + "allocated " + succeededAllocates + " blocks for this write.");
+          throw ioe;
+        }
+      }
+      // in theory, this condition should never violate due the check above
+      // still do a sanity check.
+      Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+      BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+
+      // length(len) will be in int range if the call is happening through
+      // write API of chunkOutputStream. Length can be in long range if it 
comes
+      // via Exception path.
+      int writeLen = Math.min((int)len, (int) current.getRemaining());
+      long currentPos = current.getWrittenDataLength();
+      try {
+        if (retry) {
+          current.writeOnRetry(len);
+        } else {
+          current.write(b, off, writeLen);
+        }
+      } catch (IOException ioe) {
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
+          // for the current iteration, totalDataWritten - currentPos gives the
+          // amount of data already written to the buffer
+          writeLen = (int) (current.getWrittenDataLength() - currentPos);
+          LOG.debug("writeLen {}, total len {}", writeLen, len);
+          handleException(current, currentStreamIndex);
+        } else {
+          throw ioe;
+        }
+      }
+      if (current.getRemaining() <= 0) {
+        // since the current block is already written close the stream.
+        handleFlushOrClose(true);
+        currentStreamIndex += 1;
+      }
+      len -= writeLen;
+      off += writeLen;
+    }
+  }
+
+  /**
+   * Discards the subsequent pre allocated blocks and removes the streamEntries
+   * from the streamEntries list for the container which is closed.
+   * @param containerID id of the closed container
+   */
+  private void discardPreallocatedBlocks(long containerID) {
+    // currentStreamIndex < streamEntries.size() signifies that, there are 
still
+    // pre allocated blocks available.
+    if (currentStreamIndex < streamEntries.size()) {
+      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
+          streamEntries.listIterator(currentStreamIndex);
+      while (streamEntryIterator.hasNext()) {
+        if (streamEntryIterator.next().blockID.getContainerID()
+            == containerID) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * It might be possible that the blocks pre allocated might never get written
+   * while the stream gets closed normally. In such cases, it would be a good
+   * idea to trim down the locationInfoList by removing the unused blocks if 
any
+   * so as only the used block info gets updated on OzoneManager during close.
+   */
+  private void removeEmptyBlocks() {
+    if (currentStreamIndex < streamEntries.size()) {
+      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
+          streamEntries.listIterator(currentStreamIndex);
+      while (streamEntryIterator.hasNext()) {
+        if (streamEntryIterator.next().currentPosition == 0) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+  /**
+   * It performs following actions :
+   * a. Updates the committed length at datanode for the current stream in
+   *    datanode.
+   * b. Reads the data from the underlying buffer and writes it the next 
stream.
+   *
+   * @param streamEntry StreamEntry
+   * @param streamIndex Index of the entry
+   * @throws IOException Throws IOException if Write fails
+   */
+  private void handleException(BlockOutputStreamEntry streamEntry,
+      int streamIndex) throws IOException {
+    long totalSuccessfulFlushedData =
+        streamEntry.getTotalSuccessfulFlushedData();
+    //set the correct length for the current stream
+    streamEntry.currentPosition = totalSuccessfulFlushedData;
+    long bufferedDataLen = computeBufferData();
+    // just clean up the current stream.
+    streamEntry.cleanup();
+    if (bufferedDataLen > 0) {
+      // If the data is still cached in the underlying stream, we need to
+      // allocate new block and write this data in the datanode.
+      currentStreamIndex += 1;
+      handleWrite(null, 0, bufferedDataLen, true);
+    }
+    if (totalSuccessfulFlushedData == 0) {
+      streamEntries.remove(streamIndex);
+      currentStreamIndex -= 1;
+    }
+    // discard subsequent pre allocated blocks from the streamEntries list
+    // from the closed container
+    discardPreallocatedBlocks(streamEntry.blockID.getContainerID());
+  }
+
+  private boolean checkIfContainerIsClosed(IOException ioe) {
+    if (ioe.getCause() != null) {
+      return checkIfContainerNotOpenOrRaftRetryFailureException(ioe) || 
Optional
+          .of(ioe.getCause())
+          .filter(e -> e instanceof StorageContainerException)
+          .map(e -> (StorageContainerException) e)
+          .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
+          .isPresent();
+    }
+    return false;
+  }
+
+  private boolean checkIfContainerNotOpenOrRaftRetryFailureException(
+      IOException ioe) {
+    Throwable t = ioe.getCause();
+    while (t != null) {
+      if (t instanceof ContainerNotOpenException
+          || t instanceof RaftRetryFailureException) {
+        return true;
+      }
+      t = t.getCause();
+    }
+    return false;
+  }
+
+  private boolean checkIfTimeoutException(IOException ioe) {
+    if (ioe.getCause() != null) {
+      return Optional.of(ioe.getCause())
+          .filter(e -> e instanceof TimeoutException).isPresent();
+    } else {
+      return false;
+    }
+  }
+
+  private long getKeyLength() {
+    return streamEntries.stream().mapToLong(e -> e.currentPosition)
+        .sum();
+  }
+
+  /**
+   * Contact OM to get a new block. Set the new block with the index (e.g.
+   * first block has index = 0, second has index = 1 etc.)
+   *
+   * The returned block is made to new BlockOutputStreamEntry to write.
+   *
+   * @param index the index of the block.
+   * @throws IOException
+   */
+  private void allocateNewBlock(int index) throws IOException {
+    OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID);
+    addKeyLocationInfo(subKeyInfo);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkNotClosed();
+    handleFlushOrClose(false);
+  }
+
+  /**
+   * Close or Flush the latest outputStream.
+   * @param close Flag which decides whether to call close or flush on the
+   *              outputStream.
+   * @throws IOException In case, flush or close fails with exception.
+   */
+  private void handleFlushOrClose(boolean close) throws IOException {
+    if (streamEntries.size() == 0) {
+      return;
+    }
+    int size = streamEntries.size();
+    int streamIndex =
+        currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+    BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
+    if (entry != null) {
+      try {
+        if (close) {
+          entry.close();
+        } else {
+          entry.flush();
+        }
+      } catch (IOException ioe) {
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
+          // This call will allocate a new streamEntry and write the Data.
+          // Close needs to be retried on the newly allocated streamEntry as
+          // as well.
+          handleException(entry, streamIndex);
+          handleFlushOrClose(close);
+        } else {
+          throw ioe;
+        }
+      }
+    }
+  }
+
+  /**
+   * Commit the key to OM, this will add the blocks as the new key blocks.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      handleFlushOrClose(true);
+      if (keyArgs != null) {
+        // in test, this could be null
+        removeEmptyBlocks();
+        keyArgs.setDataSize(getKeyLength());
+        keyArgs.setLocationInfoList(getLocationInfoList());
+        // When the key is multipart upload part file upload, we should not
+        // commit the key, as this is not an actual key, this is a just a
+        // partial key of a large file.
+        if (keyArgs.getIsMultipartKey()) {
+          commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs,
+              openID);
+        } else {
+          omClient.commitKey(keyArgs, openID);
+        }
+      } else {
+        LOG.warn("Closing KeyOutputStream, but key args is null");
+      }
+    } catch (IOException ioe) {
+      throw ioe;
+    } finally {
+      if (bufferList != null) {
+        bufferList.stream().forEach(e -> e.clear());
+      }
+      bufferList = null;
+    }
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return commitUploadPartInfo;
+  }
+
+  /**
+   * Builder class of KeyOutputStream.
+   */
+  public static class Builder {
+    private OpenKeySession openHandler;
+    private XceiverClientManager xceiverManager;
+    private StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
+    private OzoneManagerProtocolClientSideTranslatorPB omClient;
+    private int chunkSize;
+    private String requestID;
+    private ReplicationType type;
+    private ReplicationFactor factor;
+    private long streamBufferFlushSize;
+    private long streamBufferMaxSize;
+    private long blockSize;
+    private long watchTimeout;
+    private Checksum checksum;
+    private String multipartUploadID;
+    private int multipartNumber;
+    private boolean isMultipartKey;
+
+
+    public Builder setMultipartUploadID(String uploadID) {
+      this.multipartUploadID = uploadID;
+      return this;
+    }
+
+    public Builder setMultipartNumber(int partNumber) {
+      this.multipartNumber = partNumber;
+      return this;
+    }
+
+    public Builder setHandler(OpenKeySession handler) {
+      this.openHandler = handler;
+      return this;
+    }
+
+    public Builder setXceiverClientManager(XceiverClientManager manager) {
+      this.xceiverManager = manager;
+      return this;
+    }
+
+    public Builder setScmClient(
+        StorageContainerLocationProtocolClientSideTranslatorPB client) {
+      this.scmClient = client;
+      return this;
+    }
+
+    public Builder setOmClient(
+        OzoneManagerProtocolClientSideTranslatorPB client) {
+      this.omClient = client;
+      return this;
+    }
+
+    public Builder setChunkSize(int size) {
+      this.chunkSize = size;
+      return this;
+    }
+
+    public Builder setRequestID(String id) {
+      this.requestID = id;
+      return this;
+    }
+
+    public Builder setType(ReplicationType replicationType) {
+      this.type = replicationType;
+      return this;
+    }
+
+    public Builder setFactor(ReplicationFactor replicationFactor) {
+      this.factor = replicationFactor;
+      return this;
+    }
+
+    public Builder setStreamBufferFlushSize(long size) {
+      this.streamBufferFlushSize = size;
+      return this;
+    }
+
+    public Builder setStreamBufferMaxSize(long size) {
+      this.streamBufferMaxSize = size;
+      return this;
+    }
+
+    public Builder setBlockSize(long size) {
+      this.blockSize = size;
+      return this;
+    }
+
+    public Builder setWatchTimeout(long timeout) {
+      this.watchTimeout = timeout;
+      return this;
+    }
+
+    public Builder setChecksum(Checksum checksumObj){
+      this.checksum = checksumObj;
+      return this;
+    }
+
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
+    public KeyOutputStream build() throws IOException {
+      return new KeyOutputStream(openHandler, xceiverManager, scmClient,
+          omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
+          streamBufferMaxSize, blockSize, watchTimeout, checksum,
+          multipartUploadID, multipartNumber, isMultipartKey);
+    }
+  }
+
+  private static class BlockOutputStreamEntry extends OutputStream {
+    private OutputStream outputStream;
+    private BlockID blockID;
+    private final String key;
+    private final XceiverClientManager xceiverClientManager;
+    private final XceiverClientSpi xceiverClient;
+    private final Checksum checksum;
+    private final String requestId;
+    private final int chunkSize;
+    // total number of bytes that should be written to this stream
+    private final long length;
+    // the current position of this stream 0 <= currentPosition < length
+    private long currentPosition;
+
+    private final long streamBufferFlushSize;
+    private final long streamBufferMaxSize;
+    private final long watchTimeout;
+    private List<ByteBuffer> bufferList;
+
+    BlockOutputStreamEntry(BlockID blockID, String key,
+        XceiverClientManager xceiverClientManager,
+        XceiverClientSpi xceiverClient, String requestId, int chunkSize,
+        long length, long streamBufferFlushSize, long streamBufferMaxSize,
+        long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum) {
+      this.outputStream = null;
+      this.blockID = blockID;
+      this.key = key;
+      this.xceiverClientManager = xceiverClientManager;
+      this.xceiverClient = xceiverClient;
+      this.requestId = requestId;
+      this.chunkSize = chunkSize;
+
+      this.length = length;
+      this.currentPosition = 0;
+      this.streamBufferFlushSize = streamBufferFlushSize;
+      this.streamBufferMaxSize = streamBufferMaxSize;
+      this.watchTimeout = watchTimeout;
+      this.checksum = checksum;
+      this.bufferList = bufferList;
+    }
+
+    /**
+     * For testing purpose, taking a some random created stream instance.
+     * @param  outputStream a existing writable output stream
+     * @param  length the length of data to write to the stream
+     */
+    BlockOutputStreamEntry(OutputStream outputStream, long length,
+        Checksum checksum) {
+      this.outputStream = outputStream;
+      this.blockID = null;
+      this.key = null;
+      this.xceiverClientManager = null;
+      this.xceiverClient = null;
+      this.requestId = null;
+      this.chunkSize = -1;
+
+      this.length = length;
+      this.currentPosition = 0;
+      streamBufferFlushSize = 0;
+      streamBufferMaxSize = 0;
+      bufferList = null;
+      watchTimeout = 0;
+      this.checksum = checksum;
+    }
+
+    long getLength() {
+      return length;
+    }
+
+    long getRemaining() {
+      return length - currentPosition;
+    }
+
+    private void checkStream() {
+      if (this.outputStream == null) {
+        this.outputStream =
+            new BlockOutputStream(blockID, key, xceiverClientManager,
+                xceiverClient, requestId, chunkSize, streamBufferFlushSize,
+                streamBufferMaxSize, watchTimeout, bufferList, checksum);
+      }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      checkStream();
+      outputStream.write(b);
+      this.currentPosition += 1;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      checkStream();
+      outputStream.write(b, off, len);
+      this.currentPosition += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (this.outputStream != null) {
+        this.outputStream.flush();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.outputStream != null) {
+        this.outputStream.close();
+        // after closing the chunkOutPutStream, blockId would have been
+        // reconstructed with updated bcsId
+        if (this.outputStream instanceof BlockOutputStream) {
+          this.blockID = ((BlockOutputStream) outputStream).getBlockID();
+        }
+      }
+    }
+
+    long getTotalSuccessfulFlushedData() throws IOException {
+      if (this.outputStream instanceof BlockOutputStream) {
+        BlockOutputStream out = (BlockOutputStream) this.outputStream;
+        blockID = out.getBlockID();
+        return out.getTotalSuccessfulFlushedData();
+      } else if (outputStream == null) {
+        // For a pre allocated block for which no write has been initiated,
+        // the OutputStream will be null here.
+        // In such cases, the default blockCommitSequenceId will be 0
+        return 0;
+      }
+      throw new IOException("Invalid Output Stream for Key: " + key);
+    }
+
+    long getWrittenDataLength() throws IOException {
+      if (this.outputStream instanceof BlockOutputStream) {
+        BlockOutputStream out = (BlockOutputStream) this.outputStream;
+        return out.getWrittenDataLength();
+      } else if (outputStream == null) {
+        // For a pre allocated block for which no write has been initiated,
+        // the OutputStream will be null here.
+        // In such cases, the default blockCommitSequenceId will be 0
+        return 0;
+      }
+      throw new IOException("Invalid Output Stream for Key: " + key);
+    }
+
+    void cleanup() {
+      checkStream();
+      if (this.outputStream instanceof BlockOutputStream) {
+        BlockOutputStream out = (BlockOutputStream) this.outputStream;
+        out.cleanup();
+      }
+    }
+
+    void writeOnRetry(long len) throws IOException {
+      checkStream();
+      if (this.outputStream instanceof BlockOutputStream) {
+        BlockOutputStream out = (BlockOutputStream) this.outputStream;
+        out.writeOnRetry(len);
+        this.currentPosition += len;
+      } else {
+        throw new IOException("Invalid Output Stream for Key: " + key);
+      }
+    }
+  }
+
+  /**
+   * Verify that the output stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs
+              .getKeyName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
index 8a896ad..e4a7d6a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -24,14 +24,14 @@ import java.io.OutputStream;
 
 /**
  * OzoneOutputStream is used to write data into Ozone.
- * It uses SCM's {@link ChunkGroupOutputStream} for writing the data.
+ * It uses SCM's {@link KeyOutputStream} for writing the data.
  */
 public class OzoneOutputStream extends OutputStream {
 
   private final OutputStream outputStream;
 
   /**
-   * Constructs OzoneOutputStream with ChunkGroupOutputStream.
+   * Constructs OzoneOutputStream with KeyOutputStream.
    *
    * @param outputStream
    */
@@ -61,8 +61,8 @@ public class OzoneOutputStream extends OutputStream {
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    if (outputStream instanceof ChunkGroupOutputStream) {
-      return ((ChunkGroupOutputStream) outputStream).getCommitUploadPartInfo();
+    if (outputStream instanceof KeyOutputStream) {
+      return ((KeyOutputStream) outputStream).getCommitUploadPartInfo();
     }
     // Otherwise return null.
     return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 6cf6c4f..6a45887 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -501,8 +501,8 @@ public class RpcClient implements ClientProtocol {
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    ChunkGroupOutputStream groupOutputStream =
-        new ChunkGroupOutputStream.Builder()
+    KeyOutputStream groupOutputStream =
+        new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
             .setScmClient(storageContainerLocationClient)
@@ -726,8 +726,8 @@ public class RpcClient implements ClientProtocol {
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    ChunkGroupOutputStream groupOutputStream =
-        new ChunkGroupOutputStream.Builder()
+    KeyOutputStream groupOutputStream =
+        new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
             .setScmClient(storageContainerLocationClient)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 6b7276e..8740eba 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
@@ -133,7 +133,7 @@ public class TestCloseContainerHandlingByClient {
         .getBytes(UTF_8);
     key.write(data);
 
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
@@ -165,7 +165,7 @@ public class TestCloseContainerHandlingByClient {
         .getBytes(UTF_8);
     key.write(data);
 
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName)
@@ -188,10 +188,10 @@ public class TestCloseContainerHandlingByClient {
     String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, (4 * blockSize));
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
-    Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+    Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
     // write data more than 1 chunk
     byte[] data =
         ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize))
@@ -199,7 +199,7 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertEquals(data.length, 3 * blockSize);
     key.write(data);
 
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
@@ -234,12 +234,12 @@ public class TestCloseContainerHandlingByClient {
     String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 4 * blockSize);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
 
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     // With the initial size provided, it should have pre allocated 4 blocks
-    Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+    Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
     String dataString =
         ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize));
     byte[] data = dataString.getBytes(UTF_8);
@@ -278,10 +278,10 @@ public class TestCloseContainerHandlingByClient {
     String keyName = getKeyName();
     int keyLen = 4 * blockSize;
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
-    Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+    Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
     // write data 3 blocks and one more chunk
     byte[] writtenData =
         ContainerTestHelper.getFixedLengthString(keyString, keyLen)
@@ -290,7 +290,7 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertEquals(data.length, 3 * blockSize + chunkSize);
     key.write(data);
 
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
@@ -329,10 +329,10 @@ public class TestCloseContainerHandlingByClient {
   private void waitForContainerClose(String keyName,
       OzoneOutputStream outputStream)
       throws Exception {
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) outputStream.getOutputStream();
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) outputStream.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
-        groupOutputStream.getLocationInfoList();
+        keyOutputStream.getLocationInfoList();
     List<Long> containerIdList = new ArrayList<>();
     for (OmKeyLocationInfo info : locationInfoList) {
       containerIdList.add(info.getContainerID());
@@ -397,18 +397,18 @@ public class TestCloseContainerHandlingByClient {
     String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 2 * blockSize);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
 
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     // With the initial size provided, it should have pre allocated 4 blocks
-    Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     String dataString =
         ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     byte[] data = dataString.getBytes(UTF_8);
     key.write(data);
     List<OmKeyLocationInfo> locationInfos =
-        new ArrayList<>(groupOutputStream.getLocationInfoList());
+        new ArrayList<>(keyOutputStream.getLocationInfoList());
     long containerID = locationInfos.get(0).getContainerID();
     ContainerInfo container =
         cluster.getStorageContainerManager().getContainerManager()
@@ -423,16 +423,16 @@ public class TestCloseContainerHandlingByClient {
         ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     data = dataString.getBytes(UTF_8);
     key.write(data);
-    Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
 
     // the 1st block got written. Now all the containers are closed, so the 2nd
     // pre allocated block will be removed from the list and new block should
     // have been allocated
     Assert.assertTrue(
-        groupOutputStream.getLocationInfoList().get(0).getBlockID()
+        keyOutputStream.getLocationInfoList().get(0).getBlockID()
             .equals(locationInfos.get(0).getBlockID()));
     Assert.assertFalse(
-        groupOutputStream.getLocationInfoList().get(1).getBlockID()
+        keyOutputStream.getLocationInfoList().get(1).getBlockID()
             .equals(locationInfos.get(1).getBlockID()));
     key.close();
   }
@@ -463,7 +463,7 @@ public class TestCloseContainerHandlingByClient {
         .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
         .build();
 
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     waitForContainerClose(keyName, key);
     // Again Write the Data. This will throw an exception which will be handled
     // and new blocks will be allocated

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index c1827c9..942b847 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -126,8 +126,8 @@ public class TestContainerStateMachineFailures {
         setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
         .build();
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    KeyOutputStream groupOutputStream =
+        (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
         groupOutputStream.getLocationInfoList();
     Assert.assertEquals(1, locationInfoList.size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index 1ba4820..f94257c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -121,9 +121,9 @@ public class TestFailureHandlingByClient {
     key.write(data);
 
     // get the name of a valid container
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream groupOutputStream =
+        (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
         groupOutputStream.getLocationInfoList();
     Assert.assertTrue(locationInfoList.size() == 1);
@@ -160,9 +160,9 @@ public class TestFailureHandlingByClient {
     key.write(data.getBytes());
 
     // get the name of a valid container
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream groupOutputStream =
+        (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
         groupOutputStream.getLocationInfoList();
     Assert.assertTrue(locationInfoList.size() == 2);
@@ -201,9 +201,9 @@ public class TestFailureHandlingByClient {
     key.write(data.getBytes());
 
     // get the name of a valid container
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream groupOutputStream =
+        (KeyOutputStream) key.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
         groupOutputStream.getLocationInfoList();
     Assert.assertTrue(locationInfoList.size() == 6);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 474b920..90c3c1f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.VolumeArgs;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
@@ -666,8 +666,8 @@ public class TestOzoneRpcClient {
     OzoneOutputStream out = bucket
         .createKey(keyName, value.getBytes().length, ReplicationType.RATIS,
             ReplicationFactor.THREE);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) out.getOutputStream();
+    KeyOutputStream groupOutputStream =
+        (KeyOutputStream) out.getOutputStream();
     XceiverClientManager manager = groupOutputStream.getXceiverClientManager();
     out.write(value.getBytes());
     out.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
 
b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index a587ca3..a078c01 100644
--- 
a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ 
b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
@@ -38,7 +39,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
@@ -437,8 +437,8 @@ public final class DistributedStorageHandler implements 
StorageHandler {
         .build();
     // contact OM to allocate a block for key.
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    ChunkGroupOutputStream groupOutputStream =
-        new ChunkGroupOutputStream.Builder()
+    KeyOutputStream groupOutputStream =
+        new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
             .setScmClient(storageContainerLocationClient)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index 6b91837..4ef46a3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -31,7 +31,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 
 /**
- * This class tests ChunkGroupInputStream and ChunkGroupOutStream.
+ * This class tests ChunkGroupInputStream and KeyOutputStream.
  */
 public class TestChunkStreams {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ff1c46d/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
index faa3628..3670cff 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.fs.ozone;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 
 
 /**
@@ -31,10 +31,10 @@ import 
org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
  */
 public class OzoneFSOutputStream extends OutputStream {
 
-  private final ChunkGroupOutputStream outputStream;
+  private final KeyOutputStream outputStream;
 
   public OzoneFSOutputStream(OutputStream outputStream) {
-    this.outputStream = (ChunkGroupOutputStream)outputStream;
+    this.outputStream = (KeyOutputStream)outputStream;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to