HDDS-173. Refactor Dispatcher and implement Handler for new ContainerIO design.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13579f92
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13579f92
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13579f92

Branch: refs/heads/trunk
Commit: 13579f9296ddb087ff5fd24061f3aaf17741e6c9
Parents: ca192cb
Author: Hanisha Koneru <hanishakon...@apache.org>
Authored: Thu Jun 28 10:13:30 2018 -0700
Committer: Hanisha Koneru <hanishakon...@apache.org>
Committed: Thu Jun 28 10:13:30 2018 -0700

----------------------------------------------------------------------
 .../scm/storage/ContainerProtocolCalls.java     |  24 +-
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  12 +
 .../main/proto/DatanodeContainerProtocol.proto  |  20 +-
 .../common/src/main/resources/ozone-default.xml |  33 +
 .../container/common/helpers/ChunkUtils.java    |   5 +-
 .../common/helpers/ContainerUtils.java          | 169 ++---
 .../container/common/helpers/FileUtils.java     |  82 ---
 .../container/common/helpers/KeyUtils.java      |   5 +-
 .../container/common/impl/ContainerSet.java     |   7 +-
 .../ozone/container/common/impl/Dispatcher.java |  49 +-
 .../container/common/impl/HddsDispatcher.java   | 180 ++++++
 .../common/impl/KeyValueContainerData.java      | 191 ------
 .../container/common/impl/KeyValueYaml.java     | 275 --------
 .../container/common/interfaces/Container.java  |  15 +
 .../container/common/interfaces/Handler.java    |  71 ++
 .../server/ratis/ContainerStateMachine.java     |   5 +-
 .../container/common/volume/VolumeSet.java      |  11 +-
 .../container/keyvalue/ChunkManagerImpl.java    | 240 -------
 .../container/keyvalue/KeyManagerImpl.java      | 188 ------
 .../container/keyvalue/KeyValueContainer.java   |  29 +-
 .../keyvalue/KeyValueContainerData.java         | 200 ++++++
 .../keyvalue/KeyValueContainerLocationUtil.java | 140 ----
 .../keyvalue/KeyValueContainerUtil.java         | 148 -----
 .../container/keyvalue/KeyValueHandler.java     | 643 +++++++++++++++++++
 .../ozone/container/keyvalue/KeyValueYaml.java  | 272 ++++++++
 .../container/keyvalue/helpers/ChunkUtils.java  |  50 +-
 .../container/keyvalue/helpers/KeyUtils.java    |  35 +-
 .../helpers/KeyValueContainerLocationUtil.java  | 140 ++++
 .../keyvalue/helpers/KeyValueContainerUtil.java | 173 +++++
 .../keyvalue/helpers/SmallFileUtils.java        |  84 +++
 .../keyvalue/impl/ChunkManagerImpl.java         | 240 +++++++
 .../container/keyvalue/impl/KeyManagerImpl.java | 192 ++++++
 .../keyvalue/interfaces/KeyManager.java         |   4 +-
 .../common/TestKeyValueContainerData.java       |  36 +-
 .../container/common/impl/TestContainerSet.java |  12 +-
 .../container/common/impl/TestKeyValueYaml.java |   2 +
 .../common/interfaces/TestHandler.java          |  91 +++
 .../keyvalue/TestChunkManagerImpl.java          |   3 +-
 .../container/keyvalue/TestKeyManagerImpl.java  |   7 +-
 .../keyvalue/TestKeyValueContainer.java         |  10 +-
 .../container/keyvalue/TestKeyValueHandler.java | 246 +++++++
 .../ozone/client/io/ChunkGroupInputStream.java  |   6 +-
 .../client/io/OzoneContainerTranslation.java    |  50 --
 .../ozone/container/ContainerTestHelper.java    |  25 +-
 .../container/ozoneimpl/TestOzoneContainer.java |   6 +-
 .../genesis/BenchMarkDatanodeDispatcher.java    |  12 +-
 46 files changed, 2881 insertions(+), 1557 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index d3af083..f4f14ef 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -29,6 +29,8 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .GetKeyRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .GetKeyResponseProto;
@@ -73,16 +75,16 @@ public final class ContainerProtocolCalls  {
    * Calls the container protocol to get a container key.
    *
    * @param xceiverClient client to perform call
-   * @param containerKeyData key data to identify container
+   * @param datanodeBlockID blockID to identify container
    * @param traceID container protocol call args
    * @return container protocol get key response
    * @throws IOException if there is an I/O error while performing the call
    */
   public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
-      KeyData containerKeyData, String traceID) throws IOException {
+      DatanodeBlockID datanodeBlockID, String traceID) throws IOException {
     GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
         .newBuilder()
-        .setKeyData(containerKeyData);
+        .setBlockID(datanodeBlockID);
     String id = xceiverClient.getPipeline().getLeader().getUuidString();
     ContainerCommandRequestProto request = ContainerCommandRequestProto
         .newBuilder()
@@ -240,18 +242,15 @@ public final class ContainerProtocolCalls  {
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto
             .newBuilder();
-    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
-        .ContainerData.newBuilder();
-    containerData.setContainerID(containerID);
-    containerData.setContainerType(ContainerProtos.ContainerType
+    createRequest.setContainerID(containerID);
+    createRequest.setContainerType(ContainerProtos.ContainerType
         .KeyValueContainer);
-    createRequest.setContainerData(containerData.build());
 
     String id = client.getPipeline().getLeader().getUuidString();
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.CreateContainer);
-    request.setCreateContainer(createRequest);
+    request.setCreateContainer(createRequest.build());
     request.setDatanodeUuid(id);
     request.setTraceID(traceID);
     ContainerCommandResponseProto response = client.sendCommand(
@@ -348,14 +347,9 @@ public final class ContainerProtocolCalls  {
    */
   public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi 
client,
       BlockID blockID, String traceID) throws IOException {
-    KeyData containerKeyData = KeyData
-        .newBuilder()
-        .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-        .build();
-
     GetKeyRequestProto.Builder getKey = GetKeyRequestProto
         .newBuilder()
-        .setKeyData(containerKeyData);
+        .setBlockID(blockID.getDatanodeBlockIDProtobuf());
     ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
         GetSmallFileRequestProto
             .newBuilder().setKey(getKey)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 856d088..fc10fbb 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -256,6 +256,18 @@ public final class OzoneConfigKeys {
       "hdds.datanode.storage.utilization.critical.threshold";
   public static final double
       HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT = 0.75;
+
+  public static final String
+      HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY =
+      "hdds.write.lock.reporting.threshold.ms";
+  public static final long
+      HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+  public static final String
+      HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY =
+      "hdds.lock.suppress.warning.interval.ms";
+  public static final long
+      HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L;
+
   /**
    * There is no need to instantiate this class.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto 
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 88645be..d29e479 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -192,6 +192,7 @@ message ContainerCommandRequestProto {
   optional   PutSmallFileRequestProto putSmallFile = 16;
   optional   GetSmallFileRequestProto getSmallFile = 17;
   optional   CloseContainerRequestProto closeContainer = 18;
+
   required   string datanodeUuid = 19;
 }
 
@@ -237,14 +238,6 @@ message ContainerData {
   optional string containerDBType = 11;
 }
 
-// This is used for create Container Request.
-message CreateContainerData {
-  required int64 containerId = 1;
-  repeated KeyValue metadata = 2;
-  optional ContainerType containerType = 3 [default = KeyValueContainer];
-}
-
-
 enum ContainerType {
   KeyValueContainer = 1;
 }
@@ -252,7 +245,9 @@ enum ContainerType {
 
 // Container Messages.
 message  CreateContainerRequestProto {
-  required ContainerData containerData = 1;
+  required int64 containerID = 1;
+  repeated KeyValue metadata = 2;
+  optional ContainerType containerType = 3 [default = KeyValueContainer];
 }
 
 message  CreateContainerResponseProto {
@@ -267,8 +262,9 @@ message  ReadContainerResponseProto {
 }
 
 message  UpdateContainerRequestProto {
-  required ContainerData containerData = 1;
-  optional bool forceUpdate = 2 [default = false];
+  required int64 containerID = 1;
+  repeated KeyValue metadata = 2;
+  optional bool forceUpdate = 3 [default = false];
 }
 
 message  UpdateContainerResponseProto {
@@ -316,7 +312,7 @@ message  PutKeyResponseProto {
 }
 
 message  GetKeyRequestProto  {
-  required KeyData keyData = 1;
+  required DatanodeBlockID blockID = 1;
 }
 
 message  GetKeyResponseProto  {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 7a91610..a3e4776 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -63,6 +63,18 @@
     </description>
   </property>
   <property>
+    <name>hdds.datanode.dir</name>
+    <value/>
+    <tag>OZONE, CONTAINER, STORAGE, MANAGEMENT</tag>
+    <description>Determines where on the local filesystem HDDS data will be
+      stored. Defaults to dfs.datanode.data.dir if not specified.
+      The directories should be tagged with corresponding storage types
+      ([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for storage policies. The default
+      storage type will be DISK if the directory does not have a storage type
+      tagged explicitly.
+    </description>
+  </property>
+  <property>
     <name>dfs.container.ratis.enabled</name>
     <value>false</value>
     <tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag>
@@ -1086,4 +1098,25 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.write.lock.reporting.threshold.ms</name>
+    <value>5000</value>
+    <tag>OZONE, DATANODE, MANAGEMENT</tag>
+    <description>
+      When a write lock is held for a long time, this will be logged as the
+      lock is released. This sets how long the lock must be held for logging
+      to occur.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.lock.suppress.warning.interval.ms</name>
+    <value>10000</value>
+    <tag>OZONE, DATANODE, MANAGEMENT</tag>
+    <description>
+      Instrumentation reporting long critical sections will suppress
+      consecutive warnings within this interval.
+    </description>
+  </property>
+
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
index eba8594..e0bf213 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
@@ -313,7 +313,7 @@ public final class ChunkUtils {
    */
   public static ContainerProtos.ContainerCommandResponseProto
       getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -336,8 +336,7 @@ public final class ChunkUtils {
     response.setBlockID(msg.getReadChunk().getBlockID());
 
     ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
+        ContainerUtils.getSuccessResponseBuilder(msg);
     builder.setReadChunk(response);
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 9b52316..b975217 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -22,13 +22,20 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
@@ -42,12 +49,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 
 import static org.apache.commons.io.FilenameUtils.removeExtension;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result
-    .INVALID_ARGUMENT;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result
-    .UNABLE_TO_FIND_DATA_DIR;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
-
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.CLOSED_CONTAINER_IO;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.INVALID_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.SUCCESS;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.UNABLE_TO_FIND_DATA_DIR;
 
 /**
  * A set of helper functions to create proper responses.
@@ -59,28 +68,61 @@ public final class ContainerUtils {
   }
 
   /**
-   * Returns a CreateContainer Response. This call is used by create and delete
-   * containers which have null success responses.
-   *
-   * @param msg Request
-   * @return Response.
+   * Returns a Container Command Response Builder with the specified result
+   * and message.
+   * @param request requestProto message.
+   * @param result result of the command.
+   * @param message response message.
+   * @return ContainerCommand Response Builder.
    */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+  public static ContainerCommandResponseProto.Builder
+  getContainerCommandResponse(
+      ContainerCommandRequestProto request, Result result, String message) {
+    return
+        ContainerCommandResponseProto.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setTraceID(request.getTraceID())
+            .setResult(result)
+            .setMessage(message);
+  }
+
+  /**
+   * Returns a Container Command Response Builder. This call is used to build
+   * success responses. Calling function can add other fields to the response
+   * as required.
+   * @param request requestProto message.
+   * @return ContainerCommand Response Builder with result as SUCCESS.
+   */
+  public static ContainerCommandResponseProto.Builder 
getSuccessResponseBuilder(
+      ContainerCommandRequestProto request) {
+    return
+        ContainerCommandResponseProto.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setTraceID(request.getTraceID())
+            .setResult(Result.SUCCESS);
+  }
+
+  /**
+   * Returns a Container Command Response. This call is used for creating null
+   * success responses.
+   * @param request requestProto message.
+   * @return ContainerCommand Response with result as SUCCESS.
+   */
+  public static ContainerCommandResponseProto getSuccessResponse(
+      ContainerCommandRequestProto request) {
+    ContainerCommandResponseProto.Builder builder =
+        getContainerCommandResponse(request, Result.SUCCESS, "");
     return builder.build();
   }
 
   /**
    * Returns a ReadContainer Response.
-   *
-   * @param msg Request
-   * @param containerData - data
-   * @return Response.
+   * @param msg requestProto message.
+   * @param containerData container data to be returned.
+   * @return ReadContainer Response
    */
   public static ContainerProtos.ContainerCommandResponseProto
-      getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto 
msg,
+    getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
       ContainerData containerData) {
     Preconditions.checkNotNull(containerData);
 
@@ -89,7 +131,7 @@ public final class ContainerUtils {
     response.setContainerData(containerData.getProtoBufMessage());
 
     ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+        getSuccessResponseBuilder(msg);
     builder.setReadContainer(response);
     return builder.build();
   }
@@ -98,37 +140,25 @@ public final class ContainerUtils {
    * We found a command type but no associated payload for the command. Hence
    * return malformed Command as response.
    *
-   * @param msg - Protobuf message.
-   * @param result - result
-   * @param message - Error message.
+   * @param request - Protobuf message.
    * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
    */
-  public static ContainerProtos.ContainerCommandResponseProto.Builder
-      getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
-      ContainerProtos.Result result, String message) {
-    return
-        ContainerProtos.ContainerCommandResponseProto.newBuilder()
-            .setCmdType(msg.getCmdType())
-            .setTraceID(msg.getTraceID())
-            .setResult(result)
-            .setMessage(message);
+  public static ContainerCommandResponseProto malformedRequest(
+      ContainerCommandRequestProto request) {
+    return getContainerCommandResponse(request, Result.MALFORMED_REQUEST,
+        "Cmd type does not match the payload.").build();
   }
 
   /**
-   * Logs the error and returns a response to the caller.
+   * We found a command type that is not supported yet.
    *
-   * @param log - Logger
-   * @param ex - Exception
-   * @param msg - Request Object
-   * @return Response
+   * @param request - Protobuf message.
+   * @return ContainerCommandResponseProto - UNSUPPORTED_REQUEST.
    */
-  public static ContainerProtos.ContainerCommandResponseProto 
logAndReturnError(
-      Logger log, StorageContainerException ex,
-      ContainerProtos.ContainerCommandRequestProto msg) {
-    log.info("Operation: {} : Trace ID: {} : Message: {} : Result: {}",
-        msg.getCmdType().name(), msg.getTraceID(),
-        ex.getMessage(), ex.getResult().getValueDescriptor().getName());
-    return getContainerResponse(msg, ex.getResult(), ex.getMessage()).build();
+  public static ContainerCommandResponseProto unsupportedRequest(
+      ContainerCommandRequestProto request) {
+    return getContainerCommandResponse(request, Result.UNSUPPORTED_REQUEST,
+        "Server does not support this command yet.").build();
   }
 
   /**
@@ -136,40 +166,17 @@ public final class ContainerUtils {
    *
    * @param log - Logger
    * @param ex - Exception
-   * @param msg - Request Object
+   * @param request - Request Object
    * @return Response
    */
-  public static ContainerProtos.ContainerCommandResponseProto 
logAndReturnError(
-      Logger log, RuntimeException ex,
-      ContainerProtos.ContainerCommandRequestProto msg) {
-    log.info("Operation: {} : Trace ID: {} : Message: {} ",
-        msg.getCmdType().name(), msg.getTraceID(), ex.getMessage());
-    return getContainerResponse(msg, INVALID_ARGUMENT, 
ex.getMessage()).build();
-  }
-
-  /**
-   * We found a command type but no associated payload for the command. Hence
-   * return malformed Command as response.
-   *
-   * @param msg - Protobuf message.
-   * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
-    return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST,
-        "Cmd type does not match the payload.").build();
-  }
-
-  /**
-   * We found a command type that is not supported yet.
-   *
-   * @param msg - Protobuf message.
-   * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
-    return getContainerResponse(msg, 
ContainerProtos.Result.UNSUPPORTED_REQUEST,
-        "Server does not support this command yet.").build();
+  public static ContainerCommandResponseProto logAndReturnError(
+      Logger log, StorageContainerException ex,
+      ContainerCommandRequestProto request) {
+    log.info("Operation: {} : Trace ID: {} : Message: {} : Result: {}",
+        request.getCmdType().name(), request.getTraceID(),
+        ex.getMessage(), ex.getResult().getValueDescriptor().getName());
+    return getContainerCommandResponse(request, ex.getResult(), 
ex.getMessage())
+        .build();
   }
 
   /**
@@ -191,7 +198,7 @@ public final class ContainerUtils {
   }
 
   /**
-   * Verifies that this in indeed a new container.
+   * Verifies that this is indeed a new container.
    *
    * @param containerFile - Container File to verify
    * @throws IOException
@@ -343,7 +350,7 @@ public final class ContainerUtils {
     if(!forceDelete && !db.isEmpty()) {
       throw new StorageContainerException(
           "Container cannot be deleted because it is not empty.",
-          ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
+          Result.ERROR_CONTAINER_NOT_EMPTY);
     }
     // Close the DB connection and remove the DB handler from cache
     KeyUtils.removeDB(containerData, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
deleted file mode 100644
index a2875be..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-
-/**
- * File Utils are helper routines used by putSmallFile and getSmallFile
- * RPCs.
- */
-public final class FileUtils {
-  /**
-   * Never Constructed.
-   */
-  private FileUtils() {
-  }
-
-  /**
-   * Gets a response for the putSmallFile RPC.
-   * @param msg - ContainerCommandRequestProto
-   * @return - ContainerCommandResponseProto
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getPutFileResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
-        ContainerProtos.PutSmallFileResponseProto.newBuilder();
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
-    builder.setCmdType(ContainerProtos.Type.PutSmallFile);
-    builder.setPutSmallFile(getResponse);
-    return  builder.build();
-  }
-
-  /**
-   * Gets a response to the read small file call.
-   * @param msg - Msg
-   * @param data  - Data
-   * @param info  - Info
-   * @return    Response.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getGetSmallFileResponse(ContainerProtos.ContainerCommandRequestProto msg,
-      byte[] data, ChunkInfo info) {
-    Preconditions.checkNotNull(msg);
-
-    ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse =
-        ContainerProtos.ReadChunkResponseProto.newBuilder();
-    readChunkresponse.setChunkData(info.getProtoBufMessage());
-    readChunkresponse.setData(ByteString.copyFrom(data));
-    readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().
-        getKeyData().getBlockID());
-
-    ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
-        ContainerProtos.GetSmallFileResponseProto.newBuilder();
-    getSmallFile.setData(readChunkresponse.build());
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
-    builder.setCmdType(ContainerProtos.Type.GetSmallFile);
-    builder.setGetSmallFile(getSmallFile);
-    return builder.build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
index d52bc18..a710864 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
@@ -103,7 +103,7 @@ public final class KeyUtils {
    */
   public static ContainerProtos.ContainerCommandResponseProto
       getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
 
@@ -114,8 +114,7 @@ public final class KeyUtils {
         .GetKeyResponseProto.newBuilder();
     getKey.setKeyData(data.getProtoBufMessage());
     ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
+        ContainerUtils.getSuccessResponseBuilder(msg);
     builder.setGetKey(getKey);
     return  builder.build();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 79f038f..18a7839 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerInfo;
@@ -68,8 +69,10 @@ public class ContainerSet {
           containerId);
       return true;
     } else {
-      LOG.debug("Container already exists with container Id {}", containerId);
-      return false;
+      LOG.warn("Container already exists with container Id {}", containerId);
+      throw new StorageContainerException("Container already exists with " +
+          "container Id " + containerId,
+          ContainerProtos.Result.CONTAINER_EXISTS);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
index 3b478cd..b5fb08d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -35,7 +35,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -138,8 +138,6 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (StorageContainerException e) {
       // This useful since the trace ID will allow us to correlate failures.
       return ContainerUtils.logAndReturnError(LOG, e, msg);
-    } catch (IllegalStateException | NullPointerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
     }
   }
 
@@ -186,13 +184,13 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getContainerID(),
+          msg.getCreateContainer().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerResponse(msg,
+      return ContainerUtils.getContainerCommandResponse(msg,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
           ex.toString()).build();
     }
@@ -230,13 +228,13 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getContainerID(),
+          msg.getCreateContainer().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerResponse(msg,
+      return ContainerUtils.getContainerCommandResponse(msg,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
           ex.toString()).build();
     }
@@ -273,13 +271,13 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getContainerID(),
+          msg.getCreateContainer().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerResponse(msg,
+      return ContainerUtils.getContainerCommandResponse(msg,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
           ex.toString()).build();
     }
@@ -318,15 +316,14 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    long containerID = msg.getUpdateContainer()
-        .getContainerData().getContainerID();
+    long containerID = msg.getUpdateContainer().getContainerID();
 
-    ContainerData data = ContainerData.getFromProtBuf(
-        msg.getUpdateContainer().getContainerData(), conf);
+    ContainerData data = new ContainerData(msg.getUpdateContainer()
+        .getContainerID(), conf);
     boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
     this.containerManager.updateContainer(containerID,
         data, forceUpdate);
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -371,7 +368,7 @@ public class Dispatcher implements ContainerDispatcher {
     long containerID = msg.getDeleteContainer().getContainerID();
     boolean forceDelete = msg.getDeleteContainer().getForceDelete();
     this.containerManager.deleteContainer(containerID, forceDelete);
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -388,12 +385,11 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    ContainerData cData = ContainerData.getFromProtBuf(
-        msg.getCreateContainer().getContainerData(), conf);
-    Preconditions.checkNotNull(cData, "Container data is null");
+    ContainerData cData = new ContainerData(
+        msg.getCreateContainer().getContainerID(), conf);
 
     this.containerManager.createContainer(cData);
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -417,7 +413,7 @@ public class Dispatcher implements ContainerDispatcher {
             "container.", CLOSED_CONTAINER_IO);
       }
       this.containerManager.closeContainer(containerID);
-      return ContainerUtils.getContainerResponse(msg);
+      return ContainerUtils.getSuccessResponse(msg);
     } catch (NoSuchAlgorithmException e) {
       throw new StorageContainerException("No such Algorithm", e,
           NO_SUCH_ALGORITHM);
@@ -561,7 +557,8 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    KeyData keyData = KeyData.getFromProtoBuf(msg.getGetKey().getKeyData());
+    KeyData keyData = new KeyData(
+        BlockID.getFromProtobuf(msg.getGetKey().getBlockID()));
     Preconditions.checkNotNull(keyData);
     KeyData responseData =
         this.containerManager.getKeyManager().getKey(keyData);
@@ -634,7 +631,7 @@ public class Dispatcher implements ContainerDispatcher {
       chunks.add(chunkInfo.getProtoBufMessage());
       keyData.setChunks(chunks);
       this.containerManager.getKeyManager().putKey(keyData);
-      return FileUtils.getPutFileResponse(msg);
+      return SmallFileUtils.getPutFileResponseSuccess(msg);
     } catch (StorageContainerException e) {
       return ContainerUtils.logAndReturnError(LOG, e, msg);
     } catch (IOException e) {
@@ -661,8 +658,8 @@ public class Dispatcher implements ContainerDispatcher {
     }
     try {
       long bytes = 0;
-      KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
-          .getKey().getKeyData());
+      KeyData keyData = new KeyData(BlockID.getFromProtobuf(
+          msg.getGetSmallFile().getKey().getBlockID()));
       KeyData data = this.containerManager.getKeyManager().getKey(keyData);
       ContainerProtos.ChunkInfo c = null;
       for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
@@ -675,8 +672,8 @@ public class Dispatcher implements ContainerDispatcher {
         c = chunk;
       }
       metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
-      return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
-          ChunkInfo.getFromProtoBuf(c));
+      return SmallFileUtils.getGetSmallFileResponseSuccess(
+          msg, dataBuf.toByteArray(), ChunkInfo.getFromProtoBuf(c));
     } catch (StorageContainerException e) {
       return ContainerUtils.logAndReturnError(LOG, e, msg);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
new file mode 100644
index 0000000..e73b761
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Ozone Container dispatcher takes a call from the netty server and routes it
+ * to the right handler function.
+ */
+public class HddsDispatcher implements ContainerDispatcher {
+
+  static final Logger LOG = LoggerFactory.getLogger(HddsDispatcher.class);
+
+  private final Map<ContainerType, Handler> handlers;
+  private final Configuration conf;
+  private final ContainerSet containerSet;
+  private final VolumeSet volumeSet;
+  private final String scmID;
+
+  /**
+   * Constructs an OzoneContainer that receives calls from
+   * XceiverServerHandler.
+   */
+  public HddsDispatcher(Configuration config, ContainerSet contSet,
+      VolumeSet volumes, String scmId) {
+    // TODO: Pass ContainerSet, VolumeSet and scmID, intialize metrics
+    this.conf = config;
+    this.containerSet = contSet;
+    this.volumeSet = volumes;
+    this.scmID = scmId;
+    this.handlers = Maps.newHashMap();
+    for (ContainerType containerType : ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(
+              containerType, conf, containerSet, volumeSet, scmID));
+    }
+  }
+
+  @Override
+  public void init() {
+  }
+
+  @Override
+  public void shutdown() {
+  }
+
+  @Override
+  public ContainerCommandResponseProto dispatch(
+      ContainerCommandRequestProto msg) {
+    LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
+        msg.getTraceID());
+    Preconditions.checkNotNull(msg);
+
+    Container container = null;
+    ContainerType containerType = null;
+    try {
+      long containerID = getContainerID(msg);
+
+      if (msg.getCmdType() != ContainerProtos.Type.CreateContainer) {
+        container = getContainer(containerID);
+        containerType = getContainerType(container);
+      } else {
+        containerType = msg.getCreateContainer().getContainerType();
+      }
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, msg);
+    }
+
+    Handler handler = getHandlerForContainerType(containerType);
+    if (handler == null) {
+      StorageContainerException ex = new StorageContainerException("Invalid " +
+          "ContainerType " + containerType,
+          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+      return ContainerUtils.logAndReturnError(LOG, ex, msg);
+    }
+    return handler.handle(msg, container);
+  }
+
+  @VisibleForTesting
+  public Handler getHandlerForContainerType(ContainerType type) {
+    return handlers.get(type);
+  }
+
+  private long getContainerID(ContainerCommandRequestProto request)
+      throws StorageContainerException {
+    ContainerProtos.Type cmdType = request.getCmdType();
+
+    switch(cmdType) {
+    case CreateContainer:
+      return request.getCreateContainer().getContainerID();
+    case ReadContainer:
+      return request.getReadContainer().getContainerID();
+    case UpdateContainer:
+      return request.getUpdateContainer().getContainerID();
+    case DeleteContainer:
+      return request.getDeleteContainer().getContainerID();
+    case ListContainer:
+      return request.getListContainer().getStartContainerID();
+    case CloseContainer:
+      return request.getCloseContainer().getContainerID();
+    case PutKey:
+      return request.getPutKey().getKeyData().getBlockID().getContainerID();
+    case GetKey:
+      return request.getGetKey().getBlockID().getContainerID();
+    case DeleteKey:
+      return request.getDeleteKey().getBlockID().getContainerID();
+    case ListKey:
+      return request.getListKey().getContainerID();
+    case ReadChunk:
+      return request.getReadChunk().getBlockID().getContainerID();
+    case DeleteChunk:
+      return request.getDeleteChunk().getBlockID().getContainerID();
+    case WriteChunk:
+      return request.getWriteChunk().getBlockID().getContainerID();
+    case ListChunk:
+      return request.getListChunk().getBlockID().getContainerID();
+    case PutSmallFile:
+      return request.getPutSmallFile().getKey().getKeyData().getBlockID()
+          .getContainerID();
+    case GetSmallFile:
+      return request.getGetSmallFile().getKey().getBlockID().getContainerID();
+    }
+
+    throw new StorageContainerException(
+        ContainerProtos.Result.UNSUPPORTED_REQUEST);
+  }
+
+  @VisibleForTesting
+  public Container getContainer(long containerID)
+      throws StorageContainerException {
+    Container container = containerSet.getContainer(containerID);
+    if (container == null) {
+      throw new StorageContainerException(
+          "ContainerID " + containerID + " does not exist",
+          ContainerProtos.Result.CONTAINER_NOT_FOUND);
+    }
+    return container;
+  }
+
+  private ContainerType getContainerType(Container container) {
+    return container.getContainerType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueContainerData.java
deleted file mode 100644
index b74bab2..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueContainerData.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * This class represents the KeyValueContainer metadata, which is the
- * in-memory representation of container metadata and is represented on disk
- * by the .container file.
- */
-public class KeyValueContainerData extends ContainerData {
-
-  // Path to Container metadata Level DB/RocksDB Store and .container file.
-  private String metadataPath;
-
-  // Path to Physical file system where chunks are stored.
-  private String chunksPath;
-
-  //Type of DB used to store key to chunks mapping
-  private String containerDBType;
-
-  //Number of pending deletion blocks in container.
-  private int numPendingDeletionBlocks;
-
-  private File dbFile = null;
-
-  /**
-   * Constructs KeyValueContainerData object.
-   * @param type - containerType
-   * @param id - ContainerId
-   */
-  public KeyValueContainerData(ContainerProtos.ContainerType type, long id) {
-    super(type, id);
-    this.numPendingDeletionBlocks = 0;
-  }
-
-  /**
-   * Constructs KeyValueContainerData object.
-   * @param type - containerType
-   * @param id - ContainerId
-   * @param layOutVersion
-   */
-  public KeyValueContainerData(ContainerProtos.ContainerType type, long id,
-                               int layOutVersion) {
-    super(type, id, layOutVersion);
-    this.numPendingDeletionBlocks = 0;
-  }
-
-
-  /**
-   * Sets Container dbFile. This should be called only during creation of
-   * KeyValue container.
-   * @param containerDbFile
-   */
-  public void setDbFile(File containerDbFile) {
-    dbFile = containerDbFile;
-  }
-
-  /**
-   * Returns container DB file.
-   * @return dbFile
-   */
-  public File getDbFile() {
-    return dbFile;
-  }
-  /**
-   * Returns container metadata path.
-   *
-   * @return - path
-   */
-  public String getMetadataPath() {
-    return metadataPath;
-  }
-
-  /**
-   * Sets container metadata path.
-   *
-   * @param path - String.
-   */
-  public void setMetadataPath(String path) {
-    this.metadataPath = path;
-  }
-
-  /**
-   * Get chunks path.
-   * @return - Physical path where container file and checksum is stored.
-   */
-  public String getChunksPath() {
-    return chunksPath;
-  }
-
-  /**
-   * Set chunks Path.
-   * @param chunkPath - File path.
-   */
-  public void setChunksPath(String chunkPath) {
-    this.chunksPath = chunkPath;
-  }
-
-  /**
-   * Returns the DBType used for the container.
-   * @return containerDBType
-   */
-  public String getContainerDBType() {
-    return containerDBType;
-  }
-
-  /**
-   * Sets the DBType used for the container.
-   * @param containerDBType
-   */
-  public void setContainerDBType(String containerDBType) {
-    this.containerDBType = containerDBType;
-  }
-
-  /**
-   * Returns the number of pending deletion blocks in container.
-   * @return numPendingDeletionBlocks
-   */
-  public int getNumPendingDeletionBlocks() {
-    return numPendingDeletionBlocks;
-  }
-
-
-  /**
-   * Increase the count of pending deletion blocks.
-   *
-   * @param numBlocks increment number
-   */
-  public void incrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks += numBlocks;
-  }
-
-  /**
-   * Decrease the count of pending deletion blocks.
-   *
-   * @param numBlocks decrement number
-   */
-  public void decrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks -= numBlocks;
-  }
-
-
-  /**
-   * Constructs a KeyValueContainerData object from ProtoBuf classes.
-   *
-   * @param protoData - ProtoBuf Message
-   * @throws IOException
-   */
-  public static KeyValueContainerData getFromProtoBuf(
-      ContainerProtos.CreateContainerData protoData) throws IOException {
-
-    long containerID;
-    ContainerProtos.ContainerType containerType;
-
-    containerID = protoData.getContainerId();
-    containerType = protoData.getContainerType();
-
-    KeyValueContainerData keyValueContainerData = new KeyValueContainerData(
-        containerType, containerID);
-
-    for (int x = 0; x < protoData.getMetadataCount(); x++) {
-      keyValueContainerData.addMetadata(protoData.getMetadata(x).getKey(),
-          protoData.getMetadata(x).getValue());
-    }
-
-    return keyValueContainerData;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueYaml.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueYaml.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueYaml.java
deleted file mode 100644
index d22092c..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueYaml.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.yaml.snakeyaml.Yaml;
-
-
-import java.beans.IntrospectionException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Writer;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-
-import java.io.File;
-
-
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.Map;
-
-import org.yaml.snakeyaml.constructor.AbstractConstruct;
-import org.yaml.snakeyaml.constructor.Constructor;
-import org.yaml.snakeyaml.introspector.BeanAccess;
-import org.yaml.snakeyaml.introspector.Property;
-import org.yaml.snakeyaml.introspector.PropertyUtils;
-import org.yaml.snakeyaml.nodes.MappingNode;
-import org.yaml.snakeyaml.nodes.Node;
-import org.yaml.snakeyaml.nodes.ScalarNode;
-import org.yaml.snakeyaml.nodes.Tag;
-import org.yaml.snakeyaml.representer.Representer;
-
-/**
- * Class for creating and reading .container files.
- */
-
-public final class KeyValueYaml {
-
-  private KeyValueYaml() {
-
-  }
-  /**
-   * Creates a .container file in yaml format.
-   *
-   * @param containerFile
-   * @param containerData
-   * @throws IOException
-   */
-  public static void createContainerFile(File containerFile, ContainerData
-      containerData) throws IOException {
-
-    Preconditions.checkNotNull(containerFile, "yamlFile cannot be null");
-    Preconditions.checkNotNull(containerData, "containerData cannot be null");
-
-    PropertyUtils propertyUtils = new PropertyUtils();
-    propertyUtils.setBeanAccess(BeanAccess.FIELD);
-    propertyUtils.setAllowReadOnlyProperties(true);
-
-    Representer representer = new KeyValueContainerDataRepresenter();
-    representer.setPropertyUtils(propertyUtils);
-    representer.addClassTag(org.apache.hadoop.ozone.container.common.impl
-        .KeyValueContainerData.class, new Tag("KeyValueContainerData"));
-
-    Constructor keyValueDataConstructor = new KeyValueDataConstructor();
-
-    Yaml yaml = new Yaml(keyValueDataConstructor, representer);
-
-    Writer writer = new OutputStreamWriter(new FileOutputStream(containerFile),
-        "UTF-8");
-    yaml.dump(containerData, writer);
-    writer.close();
-  }
-
-  /**
-   * Read the yaml file, and return containerData.
-   *
-   * @param containerFile
-   * @throws IOException
-   */
-  public static KeyValueContainerData readContainerFile(File containerFile)
-      throws IOException {
-    Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
-
-    InputStream input = null;
-    KeyValueContainerData keyValueContainerData;
-    try {
-      PropertyUtils propertyUtils = new PropertyUtils();
-      propertyUtils.setBeanAccess(BeanAccess.FIELD);
-      propertyUtils.setAllowReadOnlyProperties(true);
-
-      Representer representer = new KeyValueContainerDataRepresenter();
-      representer.setPropertyUtils(propertyUtils);
-      representer.addClassTag(org.apache.hadoop.ozone.container.common.impl
-          .KeyValueContainerData.class, new Tag("KeyValueContainerData"));
-
-      Constructor keyValueDataConstructor = new KeyValueDataConstructor();
-
-      Yaml yaml = new Yaml(keyValueDataConstructor, representer);
-      yaml.setBeanAccess(BeanAccess.FIELD);
-
-      input = new FileInputStream(containerFile);
-      keyValueContainerData = (KeyValueContainerData)
-          yaml.load(input);
-    } finally {
-      if (input!= null) {
-        input.close();
-      }
-    }
-    return keyValueContainerData;
-  }
-
-  /**
-   * Representer class to define which fields need to be stored in yaml file.
-   */
-  private static class KeyValueContainerDataRepresenter extends Representer {
-    @Override
-    protected Set<Property> getProperties(Class<? extends Object> type)
-        throws IntrospectionException {
-      Set<Property> set = super.getProperties(type);
-      Set<Property> filtered = new TreeSet<Property>();
-      if (type.equals(KeyValueContainerData.class)) {
-        // filter properties
-        for (Property prop : set) {
-          String name = prop.getName();
-          // When a new field needs to be added, it needs to be added here.
-          if (name.equals("containerType") || name.equals("containerId") ||
-              name.equals("layOutVersion") || name.equals("state") ||
-              name.equals("metadata") || name.equals("metadataPath") ||
-              name.equals("chunksPath") || name.equals(
-                  "containerDBType")) {
-            filtered.add(prop);
-          }
-        }
-      }
-      return filtered;
-    }
-  }
-
-  /**
-   * Constructor class for KeyValueData, which will be used by Yaml.
-   */
-  private static class KeyValueDataConstructor extends Constructor {
-    KeyValueDataConstructor() {
-      //Adding our own specific constructors for tags.
-      this.yamlConstructors.put(new Tag("KeyValueContainerData"),
-          new ConstructKeyValueContainerData());
-      this.yamlConstructors.put(Tag.INT, new ConstructLong());
-    }
-
-    private class ConstructKeyValueContainerData extends AbstractConstruct {
-      public Object construct(Node node) {
-        MappingNode mnode = (MappingNode) node;
-        Map<Object, Object> nodes = constructMapping(mnode);
-        String type = (String) nodes.get("containerType");
-
-        ContainerProtos.ContainerType containerType = ContainerProtos
-            .ContainerType.KeyValueContainer;
-        if (type.equals("KeyValueContainer")) {
-          containerType = ContainerProtos.ContainerType.KeyValueContainer;
-        }
-
-        //Needed this, as TAG.INT type is by default converted to Long.
-        long layOutVersion = (long) nodes.get("layOutVersion");
-        int lv = (int) layOutVersion;
-
-        //When a new field is added, it needs to be added here.
-        KeyValueContainerData kvData = new KeyValueContainerData(containerType,
-            (long) nodes.get("containerId"), lv);
-        kvData.setContainerDBType((String)nodes.get("containerDBType"));
-        kvData.setMetadataPath((String) nodes.get(
-            "metadataPath"));
-        kvData.setChunksPath((String) nodes.get("chunksPath"));
-        Map<String, String> meta = (Map) nodes.get("metadata");
-        meta.forEach((key, val) -> {
-          try {
-            kvData.addMetadata(key, val);
-          } catch (IOException e) {
-            throw new IllegalStateException("Unexpected " +
-                "Key Value Pair " + "(" + key + "," + val +")in the metadata " 
+
-                "for containerId " + (long) nodes.get("containerId"));
-          }
-        });
-        String state = (String) nodes.get("state");
-        switch (state) {
-        case "OPEN":
-          kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
-          break;
-        case "CLOSING":
-          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
-          break;
-        case "CLOSED":
-          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
-          break;
-        default:
-          throw new IllegalStateException("Unexpected " +
-              "ContainerLifeCycleState " + state + " for the containerId " +
-              (long) nodes.get("containerId"));
-        }
-        return kvData;
-      }
-    }
-
-    //Below code is taken from snake yaml, as snakeyaml tries to fit the
-    // number if it fits in integer, otherwise returns long. So, slightly
-    // modified the code to return long in all cases.
-    private class ConstructLong extends AbstractConstruct {
-      public Object construct(Node node) {
-        String value = constructScalar((ScalarNode) node).toString()
-            .replaceAll("_", "");
-        int sign = +1;
-        char first = value.charAt(0);
-        if (first == '-') {
-          sign = -1;
-          value = value.substring(1);
-        } else if (first == '+') {
-          value = value.substring(1);
-        }
-        int base = 10;
-        if ("0".equals(value)) {
-          return Long.valueOf(0);
-        } else if (value.startsWith("0b")) {
-          value = value.substring(2);
-          base = 2;
-        } else if (value.startsWith("0x")) {
-          value = value.substring(2);
-          base = 16;
-        } else if (value.startsWith("0")) {
-          value = value.substring(1);
-          base = 8;
-        } else if (value.indexOf(':') != -1) {
-          String[] digits = value.split(":");
-          int bes = 1;
-          int val = 0;
-          for (int i = 0, j = digits.length; i < j; i++) {
-            val += (Long.parseLong(digits[(j - i) - 1]) * bes);
-            bes *= 60;
-          }
-          return createNumber(sign, String.valueOf(val), 10);
-        } else {
-          return createNumber(sign, value, 10);
-        }
-        return createNumber(sign, value, base);
-      }
-    }
-
-    private Number createNumber(int sign, String number, int radix) {
-      Number result;
-      if (sign < 0) {
-        number = "-" + number;
-      }
-      result = Long.valueOf(number, radix);
-      return result;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 3b7e332..a5559aa 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.ozone.container.common.interfaces;
 
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerLifeCycleState;
 import org.apache.hadoop.hdds.scm.container.common.helpers.
     StorageContainerException;
 
@@ -69,6 +72,14 @@ public interface Container extends RwLock {
   ContainerData getContainerData() throws StorageContainerException;
 
   /**
+   * Get the Container Lifecycle state.
+   *
+   * @return ContainerLifeCycleState - Container State.
+   * @throws StorageContainerException
+   */
+  ContainerLifeCycleState getContainerState();
+
+  /**
    * Closes a open container, if it is already closed or does not exist a
    * StorageContainerException is thrown.
    *
@@ -76,5 +87,9 @@ public interface Container extends RwLock {
    */
   void close() throws StorageContainerException;
 
+  /**
+   * Return the ContainerType for the container.
+   */
+  ContainerProtos.ContainerType getContainerType();
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
new file mode 100644
index 0000000..d08ad74
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+
+import java.io.IOException;
+
+/**
+ * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
+ * should have an implementation for Handler.
+ */
+public class Handler {
+
+  protected final Configuration conf;
+  protected final ContainerSet containerSet;
+  protected final VolumeSet volumeSet;
+  protected final String scmID;
+
+  protected Handler(Configuration config, ContainerSet contSet,
+      VolumeSet volumeSet, String scmID) {
+    conf = config;
+    containerSet = contSet;
+    this.volumeSet = volumeSet;
+    this.scmID = scmID;
+  }
+
+  public static Handler getHandlerForContainerType(ContainerType containerType,
+      Configuration config, ContainerSet contSet, VolumeSet volumeSet,
+      String scmID) {
+    switch (containerType) {
+    case KeyValueContainer:
+      return KeyValueHandler.getInstance(config, contSet, volumeSet, scmID);
+    default:
+      throw new IllegalArgumentException("Handler for ContainerType: " +
+        containerType + "doesn't exist.");
+    }
+  }
+
+  public ContainerCommandResponseProto handle(
+      ContainerCommandRequestProto msg, Container container) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 176407d..fc7635e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -208,7 +208,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
   private CompletableFuture<Message> handleCreateContainer(
       ContainerCommandRequestProto requestProto) {
     long containerID =
-        requestProto.getCreateContainer().getContainerData().getContainerID();
+        requestProto.getCreateContainer().getContainerID();
     createContainerFutureMap.
         computeIfAbsent(containerID, k -> new CompletableFuture<>());
     return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
@@ -265,8 +265,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
         Message message = runCommand(requestProto);
         if (cmdType == ContainerProtos.Type.CreateContainer) {
           long containerID =
-              requestProto.getCreateContainer()
-                  .getContainerData().getContainerID();
+              requestProto.getCreateContainer().getContainerID();
           createContainerFutureMap.remove(containerID).complete(message);
         }
         return CompletableFuture.completedFuture(message);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
index 61aca79..9e052b0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
@@ -101,10 +101,13 @@ public class VolumeSet {
         new InstrumentedLock(getClass().getName(), LOG,
             new ReentrantLock(true),
             conf.getTimeDuration(
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+                OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
+                OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
                 TimeUnit.MILLISECONDS),
-            300));
+            conf.getTimeDuration(
+                OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY,
+                OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT,
+                TimeUnit.MILLISECONDS)));
 
     initializeVolumeSet();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
deleted file mode 100644
index 6ee0fd3..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_ALGORITHM;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
-
-/**
- * This class is for performing chunk related operations.
- */
-public class ChunkManagerImpl implements ChunkManager {
-  static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-  /**
-   * writes a given chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block
-   * @param info - ChunkInfo
-   * @param data - data of the chunk
-   * @param stage - Stage of the Chunk operation
-   * @throws StorageContainerException
-   */
-  public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-                         byte[] data, ContainerProtos.Stage stage)
-      throws StorageContainerException {
-
-    try {
-
-      KeyValueContainerData containerData = (KeyValueContainerData) container
-          .getContainerData();
-
-      File chunkFile = ChunkUtils.validateChunk(containerData, info);
-      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
-
-      LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
-          info.getChunkName(), stage, chunkFile, tmpChunkFile);
-
-      switch (stage) {
-      case WRITE_DATA:
-        // Initially writes to temporary chunk file.
-        ChunkUtils.writeData(tmpChunkFile, info, data);
-        break;
-      case COMMIT_DATA:
-        // commit the data, means move chunk data from temporary chunk file
-        // to actual chunk file.
-        long sizeDiff = tmpChunkFile.length() - chunkFile.length();
-        commitChunk(tmpChunkFile, chunkFile);
-        containerData.incrBytesUsed(sizeDiff);
-        containerData.incrWriteCount();
-        containerData.incrWriteBytes(sizeDiff);
-        break;
-      case COMBINED:
-        // directly write to the chunk file
-        ChunkUtils.writeData(chunkFile, info, data);
-        containerData.incrBytesUsed(info.getLen());
-        containerData.incrWriteCount();
-        containerData.incrWriteBytes(info.getLen());
-        break;
-      default:
-        throw new IOException("Can not identify write operation.");
-      }
-    } catch (StorageContainerException ex) {
-      throw ex;
-    } catch (NoSuchAlgorithmException ex) {
-      LOG.error("write data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ", ex,
-          NO_SUCH_ALGORITHM);
-    } catch (ExecutionException  | IOException ex) {
-      LOG.error("write data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ", ex,
-          CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("write data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ", e,
-          CONTAINER_INTERNAL_ERROR);
-    }
-  }
-
-  /**
-   * reads the data defined by a chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block.
-   * @param info - ChunkInfo.
-   * @return byte array
-   * @throws StorageContainerException
-   * TODO: Right now we do not support partial reads and writes of chunks.
-   * TODO: Explore if we need to do that for ozone.
-   */
-  public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
-      throws StorageContainerException {
-    try {
-      KeyValueContainerData containerData = (KeyValueContainerData) container
-          .getContainerData();
-      ByteBuffer data;
-
-      // Checking here, which layout version the container is, and reading
-      // the chunk file in that format.
-      // In version1, we verify checksum if it is available and return data
-      // of the chunk file.
-      if (containerData.getLayOutVersion() == ChunkLayOutVersion
-          .getLatestVersion().getVersion()) {
-        File chunkFile = ChunkUtils.getChunkFile(containerData, info);
-        data = ChunkUtils.readData(chunkFile, info);
-        containerData.incrReadCount();
-        containerData.incrReadBytes(chunkFile.length());
-        return data.array();
-      }
-    } catch(NoSuchAlgorithmException ex) {
-      LOG.error("read data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ",
-          ex, NO_SUCH_ALGORITHM);
-    } catch (ExecutionException ex) {
-      LOG.error("read data failed. error: {}", ex);
-      throw new StorageContainerException("Internal error: ",
-          ex, CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("read data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ",
-          e, CONTAINER_INTERNAL_ERROR);
-    }
-    return null;
-  }
-
-  /**
-   * Deletes a given chunk.
-   *
-   * @param container - Container for the chunk
-   * @param blockID - ID of the block
-   * @param info - Chunk Info
-   * @throws StorageContainerException
-   */
-  public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
-      throws StorageContainerException {
-    Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
-    // Checking here, which layout version the container is, and performing
-    // deleting chunk operation.
-    // In version1, we have only chunk file.
-    if (containerData.getLayOutVersion() == ChunkLayOutVersion
-        .getLatestVersion().getVersion()) {
-      File chunkFile = ChunkUtils.getChunkFile(containerData, info);
-      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
-        FileUtil.fullyDelete(chunkFile);
-        containerData.decrBytesUsed(chunkFile.length());
-      } else {
-        LOG.error("Not Supported Operation. Trying to delete a " +
-            "chunk that is in shared file. chunk info : " + info.toString());
-        throw new StorageContainerException("Not Supported Operation. " +
-            "Trying to delete a chunk that is in shared file. chunk info : "
-            + info.toString(), UNSUPPORTED_REQUEST);
-      }
-    }
-  }
-
-  /**
-   * Shutdown the chunkManager.
-   *
-   * In the chunkManager we haven't acquired any resources, so nothing to do
-   * here.
-   */
-
-  public void shutdown() {
-    //TODO: need to revisit this during integration of container IO.
-  }
-
-  /**
-   * Returns the temporary chunkFile path.
-   * @param chunkFile
-   * @param info
-   * @return temporary chunkFile path
-   * @throws StorageContainerException
-   */
-  private File getTmpChunkFile(File chunkFile, ChunkInfo info)
-      throws StorageContainerException {
-    return new File(chunkFile.getParent(),
-        chunkFile.getName() +
-            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
-            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
-  }
-
-  /**
-   * Commit the chunk by renaming the temporary chunk file to chunk file.
-   * @param tmpChunkFile
-   * @param chunkFile
-   * @throws IOException
-   */
-  private void commitChunk(File tmpChunkFile, File chunkFile) throws
-      IOException {
-    Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
-        StandardCopyOption.REPLACE_EXISTING);
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to