HDDS-887. Add DispatcherContext info to Dispatcher from containerStateMachine. 
Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a3c7714
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a3c7714
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a3c7714

Branch: refs/heads/HDFS-12943
Commit: 5a3c7714c4d7822827ec365ea187fa8f43eb0e45
Parents: d15dc43
Author: Shashikant Banerjee <shashik...@apache.org>
Authored: Sun Dec 2 08:00:35 2018 +0530
Committer: Shashikant Banerjee <shashik...@apache.org>
Committed: Sun Dec 2 08:00:35 2018 +0530

----------------------------------------------------------------------
 .../main/proto/DatanodeContainerProtocol.proto  |   8 --
 .../container/common/impl/HddsDispatcher.java   |   8 +-
 .../common/interfaces/ContainerDispatcher.java  |   5 +-
 .../container/common/interfaces/Handler.java    |   4 +-
 .../transport/server/GrpcXceiverService.java    |   3 +-
 .../transport/server/XceiverServerGrpc.java     |   2 +-
 .../server/ratis/ContainerStateMachine.java     | 120 +++++++----------
 .../server/ratis/DispatcherContext.java         | 133 +++++++++++++++++++
 .../container/keyvalue/KeyValueHandler.java     |  75 +++++++----
 .../container/keyvalue/helpers/BlockUtils.java  |   8 +-
 .../keyvalue/helpers/SmallFileUtils.java        |  10 +-
 .../keyvalue/impl/ChunkManagerImpl.java         |   4 +-
 .../keyvalue/interfaces/ChunkManager.java       |   5 +-
 .../common/impl/TestHddsDispatcher.java         |  14 +-
 .../keyvalue/TestChunkManagerImpl.java          |  17 +--
 .../container/keyvalue/TestKeyValueHandler.java |  48 +++----
 .../common/impl/TestContainerPersistence.java   |  20 +--
 .../transport/server/ratis/TestCSMMetrics.java  |   3 +-
 .../container/server/TestContainerServer.java   |   6 +-
 .../genesis/BenchMarkDatanodeDispatcher.java    |  16 +--
 20 files changed, 321 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto 
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 5237af8..661d910 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -373,17 +373,10 @@ enum ChecksumType {
     MD5 = 5;
 }
 
-enum Stage {
-    WRITE_DATA = 1;
-    COMMIT_DATA = 2;
-    COMBINED = 3;
-}
-
 message  WriteChunkRequestProto  {
   required DatanodeBlockID blockID = 1;
   required ChunkInfo chunkData = 2;
   optional bytes data = 3;
-  optional Stage stage = 4 [default = COMBINED];
 }
 
 message  WriteChunkResponseProto {
@@ -392,7 +385,6 @@ message  WriteChunkResponseProto {
 message  ReadChunkRequestProto  {
   required DatanodeBlockID blockID = 1;
   required ChunkInfo chunkData = 2;
-  optional bool readFromTmpFile = 3 [default = false];
 }
 
 message  ReadChunkResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 352cc86..c5c51a3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -47,6 +47,8 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+    .DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -133,7 +135,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
 
   @Override
   public ContainerCommandResponseProto dispatch(
-      ContainerCommandRequestProto msg) {
+      ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
     Preconditions.checkNotNull(msg);
     LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
         msg.getTraceID());
@@ -194,7 +196,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
       audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
       return ContainerUtils.logAndReturnError(LOG, ex, msg);
     }
-    responseProto = handler.handle(msg, container);
+    responseProto = handler.handle(msg, container, dispatcherContext);
     if (responseProto != null) {
       metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime);
 
@@ -269,7 +271,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
     // TODO: Assuming the container type to be KeyValueContainer for now.
     // We need to get container type from the containerRequest.
     Handler handler = getHandler(containerType);
-    handler.handle(requestBuilder.build(), null);
+    handler.handle(requestBuilder.build(), null, null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 7a22143..46a0b55 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 
 /**
  * Dispatcher acts as the bridge between the transport layer and
@@ -37,9 +38,11 @@ public interface ContainerDispatcher {
   /**
    * Dispatches commands to container layer.
    * @param msg - Command Request
+   * @param context - Context info related to ContainerStateMachine
    * @return Command Response
    */
-  ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg);
+  ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg,
+      DispatcherContext context);
 
   /**
    * Validates whether the container command should be executed on the pipeline

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index ad55618..9f520d5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
@@ -103,7 +104,8 @@ public abstract class Handler {
   }
 
   public abstract ContainerCommandResponseProto handle(
-      ContainerCommandRequestProto msg, Container container);
+      ContainerCommandRequestProto msg, Container container,
+      DispatcherContext dispatcherContext);
 
   /**
    * Import container data from a raw input stream.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index 5fc3661..37b7d5d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -53,7 +53,8 @@ public class GrpcXceiverService extends
       @Override
       public void onNext(ContainerCommandRequestProto request) {
         try {
-          ContainerCommandResponseProto resp = dispatcher.dispatch(request);
+          ContainerCommandResponseProto resp =
+              dispatcher.dispatch(request, null);
           responseObserver.onNext(resp);
         } catch (Throwable e) {
           LOG.error("{} got exception when processing"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 4f7799d..4ed9653 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -130,7 +130,7 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
   public void submitRequest(ContainerCommandRequestProto request,
       HddsProtos.PipelineID pipelineID) throws IOException {
     ContainerProtos.ContainerCommandResponseProto response =
-        storageContainer.dispatch(request);
+        storageContainer.dispatch(request, null);
     if (response.getResult() != ContainerProtos.Result.SUCCESS) {
       throw new StorageContainerException(response.getMessage(),
           response.getResult());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 2e0c70e..b693e9e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -22,8 +22,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -35,7 +33,6 @@ import 
org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -237,7 +234,6 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       final WriteChunkRequestProto dataWriteChunkProto =
           WriteChunkRequestProto
               .newBuilder(write)
-              .setStage(Stage.WRITE_DATA)
               .build();
       ContainerCommandRequestProto dataContainerCommandProto =
           ContainerCommandRequestProto
@@ -252,7 +248,6 @@ public class ContainerStateMachine extends BaseStateMachine 
{
               .setChunkData(write.getChunkData())
               // skipping the data field as it is
               // already set in statemachine data proto
-              .setStage(Stage.COMMIT_DATA)
               .build();
       ContainerCommandRequestProto commitContainerCommandProto =
           ContainerCommandRequestProto
@@ -292,15 +287,18 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   }
 
   private ContainerCommandResponseProto dispatchCommand(
-      ContainerCommandRequestProto requestProto) {
+      ContainerCommandRequestProto requestProto,
+      DispatcherContext context) {
     LOG.trace("dispatch {}", requestProto);
-    ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
+    ContainerCommandResponseProto response =
+        dispatcher.dispatch(requestProto, context);
     LOG.trace("response {}", response);
     return response;
   }
 
-  private Message runCommand(ContainerCommandRequestProto requestProto) {
-    return dispatchCommand(requestProto)::toByteString;
+  private Message runCommand(ContainerCommandRequestProto requestProto,
+      DispatcherContext context) {
+    return dispatchCommand(requestProto, context)::toByteString;
   }
 
   private ExecutorService getCommandExecutor(
@@ -310,7 +308,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
   }
 
   private CompletableFuture<Message> handleWriteChunk(
-      ContainerCommandRequestProto requestProto, long entryIndex) {
+      ContainerCommandRequestProto requestProto, long entryIndex, long term) {
     final WriteChunkRequestProto write = requestProto.getWriteChunk();
     RaftServer server = ratisServer.getServer();
     Preconditions.checkState(server instanceof RaftServerProxy);
@@ -321,8 +319,14 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     } catch (IOException ioe) {
       return completeExceptionally(ioe);
     }
+    DispatcherContext context =
+        new DispatcherContext.Builder()
+            .setTerm(term)
+            .setLogIndex(entryIndex)
+            .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+            .build();
     CompletableFuture<Message> writeChunkFuture = CompletableFuture
-        .supplyAsync(() -> runCommand(requestProto), chunkExecutor);
+        .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
     LOG.debug("writeChunk writeStateMachineData : blockId " + 
write.getBlockID()
         + " logIndex " + entryIndex + " chunkName " + write.getChunkData()
@@ -355,7 +359,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       // CreateContainer will happen as a part of writeChunk only.
       switch (cmdType) {
       case WriteChunk:
-        return handleWriteChunk(requestProto, entry.getIndex());
+        return handleWriteChunk(requestProto, entry.getIndex(),
+            entry.getTerm());
       default:
         throw new IllegalStateException("Cmd Type:" + cmdType
             + " should not have state machine data");
@@ -372,39 +377,36 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       metrics.incNumReadStateMachineOps();
       final ContainerCommandRequestProto requestProto =
           getRequestProto(request.getContent());
-      return CompletableFuture.completedFuture(runCommand(requestProto));
+      return CompletableFuture.completedFuture(runCommand(requestProto, null));
     } catch (IOException e) {
       metrics.incNumReadStateMachineFails();
       return completeExceptionally(e);
     }
   }
 
-  private ByteString readStateMachineData(ContainerCommandRequestProto
-                                              requestProto) {
+  private ByteString readStateMachineData(
+      ContainerCommandRequestProto requestProto, long term, long index) {
     WriteChunkRequestProto writeChunkRequestProto =
         requestProto.getWriteChunk();
-    // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
-    // written through writeStateMachineData.
-    Preconditions
-        .checkArgument(writeChunkRequestProto.getStage() == Stage.COMMIT_DATA);
-
     // prepare the chunk to be read
     ReadChunkRequestProto.Builder readChunkRequestProto =
         ReadChunkRequestProto.newBuilder()
             .setBlockID(writeChunkRequestProto.getBlockID())
-            .setChunkData(writeChunkRequestProto.getChunkData())
-            // set readFromTempFile to true in case, the chunkFile does
-            // not exist as applyTransaction is not executed for this entry 
yet.
-            .setReadFromTmpFile(true);
+            .setChunkData(writeChunkRequestProto.getChunkData());
     ContainerCommandRequestProto dataContainerCommandProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
             .setCmdType(Type.ReadChunk)
             .setReadChunk(readChunkRequestProto)
             .build();
-
+    DispatcherContext context =
+        new DispatcherContext.Builder()
+            .setTerm(term)
+            .setLogIndex(index)
+            .setReadFromTmpFile(true)
+            .build();
     // read the chunk
     ContainerCommandResponseProto response =
-        dispatchCommand(dataContainerCommandProto);
+        dispatchCommand(dataContainerCommandProto, context);
     ReadChunkResponseProto responseProto = response.getReadChunk();
 
     ByteString data = responseProto.getData();
@@ -416,14 +418,14 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   /**
    * Reads the Entry from the Cache or loads it back by reading from disk.
    */
-  private ByteString getCachedStateMachineData(Long logIndex,
+  private ByteString getCachedStateMachineData(Long logIndex, long term,
       ContainerCommandRequestProto requestProto) throws ExecutionException {
     try {
       return reconstructWriteChunkRequest(
           stateMachineDataCache.get(logIndex, new Callable<ByteString>() {
             @Override
             public ByteString call() throws Exception {
-              return readStateMachineData(requestProto);
+              return readStateMachineData(requestProto, term, logIndex);
             }
           }), requestProto);
     } catch (ExecutionException e) {
@@ -439,7 +441,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     final WriteChunkRequestProto.Builder dataWriteChunkProto =
         WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
             // adding the state machine data
-            .setData(data).setStage(Stage.WRITE_DATA);
+            .setData(data);
 
     ContainerCommandRequestProto.Builder newStateMachineProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
@@ -486,7 +488,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
         CompletableFuture<ByteString> future = new CompletableFuture<>();
         return future.supplyAsync(() -> {
           try {
-            return getCachedStateMachineData(entry.getIndex(), requestProto);
+            return getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
+                requestProto);
           } catch (ExecutionException e) {
             future.completeExceptionally(e);
             return null;
@@ -524,6 +527,10 @@ public class ContainerStateMachine extends 
BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     long index = trx.getLogEntry().getIndex();
+    DispatcherContext.Builder builder =
+        new DispatcherContext.Builder()
+            .setTerm(trx.getLogEntry().getTerm())
+            .setLogIndex(index);
 
     // ApplyTransaction call can come with an entryIndex much greater than
     // lastIndex updated because in between entries in the raft log can be
@@ -539,51 +546,16 @@ public class ContainerStateMachine extends 
BaseStateMachine {
           getRequestProto(trx.getStateMachineLogEntry().getLogData());
       Type cmdType = requestProto.getCmdType();
       CompletableFuture<Message> future;
-      if (cmdType == Type.PutBlock || cmdType == Type.PutSmallFile) {
-        BlockData blockData;
-        ContainerProtos.BlockData blockDataProto = cmdType == Type.PutBlock ?
-            requestProto.getPutBlock().getBlockData() :
-            requestProto.getPutSmallFile().getBlock().getBlockData();
-
-        // set the blockCommitSequenceId
-        try {
-          blockData = BlockData.getFromProtoBuf(blockDataProto);
-        } catch (IOException ioe) {
-          LOG.error("unable to retrieve blockData info for Block {}",
-              blockDataProto.getBlockID());
-          return completeExceptionally(ioe);
-        }
-        blockData.setBlockCommitSequenceId(index);
-        final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
-            ContainerProtos.PutBlockRequestProto
-                .newBuilder(requestProto.getPutBlock())
-                .setBlockData(blockData.getProtoBufMessage()).build();
-        ContainerCommandRequestProto containerCommandRequestProto;
-        if (cmdType == Type.PutSmallFile) {
-          ContainerProtos.PutSmallFileRequestProto smallFileRequestProto =
-              ContainerProtos.PutSmallFileRequestProto
-                  .newBuilder(requestProto.getPutSmallFile())
-                  .setBlock(putBlockRequestProto).build();
-          containerCommandRequestProto =
-              ContainerCommandRequestProto.newBuilder(requestProto)
-                  .setPutSmallFile(smallFileRequestProto).build();
-        } else {
-          containerCommandRequestProto =
-              ContainerCommandRequestProto.newBuilder(requestProto)
-                  .setPutBlock(putBlockRequestProto).build();
-        }
-        future = CompletableFuture
-            .supplyAsync(() -> runCommand(containerCommandRequestProto),
-                getCommandExecutor(requestProto));
-      } else {
-        // Make sure that in write chunk, the user data is not set
-        if (cmdType == Type.WriteChunk) {
-          Preconditions.checkArgument(requestProto
-              .getWriteChunk().getData().isEmpty());
-        }
-        future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
-            getCommandExecutor(requestProto));
+      // Make sure that in write chunk, the user data is not set
+      if (cmdType == Type.WriteChunk) {
+        Preconditions
+            .checkArgument(requestProto.getWriteChunk().getData().isEmpty());
+        builder
+            .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
       }
+      future = CompletableFuture
+          .supplyAsync(() -> runCommand(requestProto, builder.build()),
+              getCommandExecutor(requestProto));
       lastIndex = index;
       future.thenAccept(m -> {
         final Long previous =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
new file mode 100644
index 0000000..a46e6b8
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * DispatcherContext class holds transport protocol specfic context info
+ * required for execution of container commands over the container dispatcher.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DispatcherContext {
+  /**
+   * Determines which stage of writeChunk a write chunk request is for.
+   */
+  public enum WriteChunkStage {
+    WRITE_DATA, COMMIT_DATA, COMBINED
+  }
+
+  // whether the chunk data needs to be written or committed or both
+  private final WriteChunkStage stage;
+  // indicates whether the read from tmp chunk files is allowed
+  private final boolean readFromTmpFile;
+  // which term the request is being served in Ratis
+  private final long term;
+  // the log index in Ratis log to which the request belongs to
+  private final long logIndex;
+
+  private DispatcherContext(long term, long index, WriteChunkStage stage,
+      boolean readFromTmpFile) {
+    this.term = term;
+    this.logIndex = index;
+    this.stage = stage;
+    this.readFromTmpFile = readFromTmpFile;
+  }
+
+  public long getLogIndex() {
+    return logIndex;
+  }
+
+  public boolean isReadFromTmpFile() {
+    return readFromTmpFile;
+  }
+
+  public long getTerm() {
+    return term;
+  }
+
+  public WriteChunkStage getStage() {
+    return stage;
+  }
+
+  /**
+   * Builder class for building DispatcherContext.
+   */
+  public static final class Builder {
+    private WriteChunkStage stage = WriteChunkStage.COMBINED;
+    private boolean readFromTmpFile = false;
+    private long term;
+    private long logIndex;
+
+    /**
+     * Sets the WriteChunkStage.
+     *
+     * @param stage WriteChunk Stage
+     * @return DispatcherContext.Builder
+     */
+    public Builder setStage(WriteChunkStage stage) {
+      this.stage = stage;
+      return this;
+    }
+
+    /**
+     * Sets the flag for reading from tmp chunk files.
+     *
+     * @param readFromTmpFile whether to read from tmp chunk file or not
+     * @return DispatcherContext.Builder
+     */
+    public Builder setReadFromTmpFile(boolean readFromTmpFile) {
+      this.readFromTmpFile = readFromTmpFile;
+      return this;
+    }
+
+    /**
+     * Sets the current term for the container request from Ratis.
+     *
+     * @param term current term
+     * @return DispatcherContext.Builder
+     */
+    public Builder setTerm(long term) {
+      this.term = term;
+      return this;
+    }
+
+    /**
+     * Sets the logIndex for the container request from Ratis.
+     *
+     * @param logIndex log index
+     * @return DispatcherContext.Builder
+     */
+    public Builder setLogIndex(long logIndex) {
+      this.logIndex = logIndex;
+      return this;
+    }
+
+    /**
+     * Builds and returns DatanodeDetails instance.
+     *
+     * @return DispatcherContext
+     */
+    public DispatcherContext build() {
+      return new DispatcherContext(term, logIndex, stage, readFromTmpFile);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index a16129e..b4cfcd0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -58,6 +58,10 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+    .DispatcherContext;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+    .DispatcherContext.WriteChunkStage;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
@@ -81,8 +85,6 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Stage;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
@@ -146,7 +148,8 @@ public class KeyValueHandler extends Handler {
 
   @Override
   public ContainerCommandResponseProto handle(
-      ContainerCommandRequestProto request, Container container) {
+      ContainerCommandRequestProto request, Container container,
+      DispatcherContext dispatcherContext) {
 
     Type cmdType = request.getCmdType();
     KeyValueContainer kvContainer = (KeyValueContainer) container;
@@ -164,7 +167,7 @@ public class KeyValueHandler extends Handler {
     case CloseContainer:
       return handleCloseContainer(request, kvContainer);
     case PutBlock:
-      return handlePutBlock(request, kvContainer);
+      return handlePutBlock(request, kvContainer, dispatcherContext);
     case GetBlock:
       return handleGetBlock(request, kvContainer);
     case DeleteBlock:
@@ -172,17 +175,17 @@ public class KeyValueHandler extends Handler {
     case ListBlock:
       return handleUnsupportedOp(request);
     case ReadChunk:
-      return handleReadChunk(request, kvContainer);
+      return handleReadChunk(request, kvContainer, dispatcherContext);
     case DeleteChunk:
       return handleDeleteChunk(request, kvContainer);
     case WriteChunk:
-      return handleWriteChunk(request, kvContainer);
+      return handleWriteChunk(request, kvContainer, dispatcherContext);
     case ListChunk:
       return handleUnsupportedOp(request);
     case CompactChunk:
       return handleUnsupportedOp(request);
     case PutSmallFile:
-      return handlePutSmallFile(request, kvContainer);
+      return handlePutSmallFile(request, kvContainer, dispatcherContext);
     case GetSmallFile:
       return handleGetSmallFile(request, kvContainer);
     case GetCommittedBlockLength:
@@ -392,7 +395,8 @@ public class KeyValueHandler extends Handler {
    * Handle Put Block operation. Calls BlockManager to process the request.
    */
   ContainerCommandResponseProto handlePutBlock(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+      DispatcherContext dispatcherContext) {
 
     long blockLength;
     if (!request.hasPutBlock()) {
@@ -401,14 +405,18 @@ public class KeyValueHandler extends Handler {
       return ContainerUtils.malformedRequest(request);
     }
 
+    BlockData blockData;
     try {
       checkContainerOpen(kvContainer);
 
-      BlockData blockData = BlockData.getFromProtoBuf(
+      blockData = BlockData.getFromProtoBuf(
           request.getPutBlock().getBlockData());
       Preconditions.checkNotNull(blockData);
+      long bcsId =
+          dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
+      blockData.setBlockCommitSequenceId(bcsId);
       long numBytes = blockData.getProtoBufMessage().toByteArray().length;
-      blockLength = blockManager.putBlock(kvContainer, blockData);
+      blockManager.putBlock(kvContainer, blockData);
       metrics.incContainerBytesStats(Type.PutBlock, numBytes);
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -418,7 +426,7 @@ public class KeyValueHandler extends Handler {
           request);
     }
 
-    return BlockUtils.putBlockResponseSuccess(request, blockLength);
+    return BlockUtils.putBlockResponseSuccess(request, blockData);
   }
 
   /**
@@ -514,7 +522,8 @@ public class KeyValueHandler extends Handler {
    * Handle Read Chunk operation. Calls ChunkManager to process the request.
    */
   ContainerCommandResponseProto handleReadChunk(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+      DispatcherContext dispatcherContext) {
 
     if (!request.hasReadChunk()) {
       LOG.debug("Malformed Read Chunk request. trace ID: {}",
@@ -531,8 +540,10 @@ public class KeyValueHandler extends Handler {
           .getChunkData());
       Preconditions.checkNotNull(chunkInfo);
 
-      data = chunkManager.readChunk(kvContainer, blockID, chunkInfo,
-          request.getReadChunk().getReadFromTmpFile());
+      boolean isReadFromTmpFile = dispatcherContext == null ? false :
+          dispatcherContext.isReadFromTmpFile();
+      data = chunkManager
+          .readChunk(kvContainer, blockID, chunkInfo, isReadFromTmpFile);
       metrics.incContainerBytesStats(Type.ReadChunk, data.length);
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -583,7 +594,8 @@ public class KeyValueHandler extends Handler {
    * Handle Write Chunk operation. Calls ChunkManager to process the request.
    */
   ContainerCommandResponseProto handleWriteChunk(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+      DispatcherContext dispatcherContext) {
 
     if (!request.hasWriteChunk()) {
       LOG.debug("Malformed Write Chunk request. trace ID: {}",
@@ -602,17 +614,19 @@ public class KeyValueHandler extends Handler {
       Preconditions.checkNotNull(chunkInfo);
 
       ByteBuffer data = null;
-      if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
-          request.getWriteChunk().getStage() == Stage.COMBINED) {
+      WriteChunkStage stage =
+          dispatcherContext == null ? WriteChunkStage.COMBINED :
+              dispatcherContext.getStage();
+      if (stage == WriteChunkStage.WRITE_DATA ||
+          stage == WriteChunkStage.COMBINED) {
         data = request.getWriteChunk().getData().asReadOnlyByteBuffer();
       }
 
-      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
-          request.getWriteChunk().getStage());
+      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage);
 
       // We should increment stats after writeChunk
-      if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
-          request.getWriteChunk().getStage() == Stage.COMBINED) {
+      if (stage == WriteChunkStage.WRITE_DATA||
+          stage == WriteChunkStage.COMBINED) {
         metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
             .getChunkData().getLen());
       }
@@ -633,7 +647,8 @@ public class KeyValueHandler extends Handler {
    * request.
    */
   ContainerCommandResponseProto handlePutSmallFile(
-      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+      DispatcherContext dispatcherContext) {
 
     if (!request.hasPutSmallFile()) {
       LOG.debug("Malformed Put Small File request. trace ID: {}",
@@ -642,13 +657,14 @@ public class KeyValueHandler extends Handler {
     }
     PutSmallFileRequestProto putSmallFileReq =
         request.getPutSmallFile();
+    BlockData blockData;
 
     try {
       checkContainerOpen(kvContainer);
 
       BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock()
           .getBlockData().getBlockID());
-      BlockData blockData = BlockData.getFromProtoBuf(
+      blockData = BlockData.getFromProtoBuf(
           putSmallFileReq.getBlock().getBlockData());
       Preconditions.checkNotNull(blockData);
 
@@ -656,15 +672,20 @@ public class KeyValueHandler extends Handler {
           putSmallFileReq.getChunkInfo());
       Preconditions.checkNotNull(chunkInfo);
       ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer();
+      WriteChunkStage stage =
+          dispatcherContext == null ? WriteChunkStage.COMBINED :
+              dispatcherContext.getStage();
       // chunks will be committed as a part of handling putSmallFile
       // here. There is no need to maintain this info in openContainerBlockMap.
-      chunkManager.writeChunk(
-          kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
+      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage);
 
       List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
       chunks.add(chunkInfo.getProtoBufMessage());
       blockData.setChunks(chunks);
-      // TODO: add bcsId as a part of putSmallFile transaction
+      long bcsId =
+          dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
+      blockData.setBlockCommitSequenceId(bcsId);
+
       blockManager.putBlock(kvContainer, blockData);
       metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity());
 
@@ -676,7 +697,7 @@ public class KeyValueHandler extends Handler {
               PUT_SMALL_FILE_ERROR), request);
     }
 
-    return SmallFileUtils.getPutFileResponseSuccess(request);
+    return SmallFileUtils.getPutFileResponseSuccess(request, blockData);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
index 667e66d..200e8ea 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java
@@ -133,12 +133,12 @@ public final class BlockUtils {
    * @return Response.
    */
   public static ContainerCommandResponseProto putBlockResponseSuccess(
-      ContainerCommandRequestProto msg, long blockLength) {
-    ContainerProtos.BlockData blockData = msg.getPutBlock().getBlockData();
+      ContainerCommandRequestProto msg, BlockData blockData) {
+    ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
     GetCommittedBlockLengthResponseProto.Builder
         committedBlockLengthResponseBuilder =
-        getCommittedBlockLengthResponseBuilder(blockLength,
-            blockData.getBlockID());
+        getCommittedBlockLengthResponseBuilder(blockData.getSize(),
+            blockDataProto.getBlockID());
     PutBlockResponseProto.Builder putKeyResponse =
         PutBlockResponseProto.newBuilder();
     putKeyResponse

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
index e91c8a6..ba2b02c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -44,15 +45,14 @@ public final class SmallFileUtils {
    * @return - ContainerCommandResponseProto
    */
   public static ContainerCommandResponseProto getPutFileResponseSuccess(
-      ContainerCommandRequestProto msg) {
+      ContainerCommandRequestProto msg, BlockData blockData) {
     ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
         ContainerProtos.PutSmallFileResponseProto.newBuilder();
-    ContainerProtos.BlockData blockData =
-        msg.getPutSmallFile().getBlock().getBlockData();
+    ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage();
     ContainerProtos.GetCommittedBlockLengthResponseProto.Builder
         committedBlockLengthResponseBuilder = BlockUtils
-        .getCommittedBlockLengthResponseBuilder(blockData.getSize(),
-            blockData.getBlockID());
+        .getCommittedBlockLengthResponseBuilder(blockDataProto.getSize(),
+            blockDataProto.getBlockID());
     getResponse.setCommittedBlockLength(committedBlockLengthResponseBuilder);
     ContainerCommandResponseProto.Builder builder =
         ContainerUtils.getSuccessResponseBuilder(msg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
index e41346c..38430af 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
@@ -21,10 +21,10 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
@@ -66,7 +66,7 @@ public class ChunkManagerImpl implements ChunkManager {
    * @throws StorageContainerException
    */
   public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-      ByteBuffer data, ContainerProtos.Stage stage)
+      ByteBuffer data, DispatcherContext.WriteChunkStage stage)
       throws StorageContainerException {
 
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index f90346e..4282e46 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -19,10 +19,11 @@ package 
org.apache.hadoop.ozone.container.keyvalue.interfaces;
  */
 
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+
 import java.nio.ByteBuffer;
 
 /**
@@ -42,7 +43,7 @@ public interface ChunkManager {
    * @throws StorageContainerException
    */
   void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-                  ByteBuffer data, ContainerProtos.Stage stage)
+      ByteBuffer data, DispatcherContext.WriteChunkStage stage)
       throws StorageContainerException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 9940d4d..933ed70 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -99,16 +99,16 @@ public class TestHddsDispatcher {
       HddsDispatcher hddsDispatcher = new HddsDispatcher(
           conf, containerSet, volumeSet, handlers, context, metrics);
       hddsDispatcher.setScmId(scmId.toString());
-      ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch(
-          getWriteChunkRequest(dd.getUuidString(), 1L, 1L));
+      ContainerCommandResponseProto responseOne = hddsDispatcher
+          .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
       Assert.assertEquals(ContainerProtos.Result.SUCCESS,
           responseOne.getResult());
       verify(context, times(0))
           .addContainerActionIfAbsent(Mockito.any(ContainerAction.class));
       containerData.setBytesUsed(Double.valueOf(
           StorageUnit.MB.toBytes(950)).longValue());
-      ContainerCommandResponseProto responseTwo = hddsDispatcher.dispatch(
-          getWriteChunkRequest(dd.getUuidString(), 1L, 2L));
+      ContainerCommandResponseProto responseTwo = hddsDispatcher
+          .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
       Assert.assertEquals(ContainerProtos.Result.SUCCESS,
           responseTwo.getResult());
       verify(context, times(1))
@@ -150,16 +150,16 @@ public class TestHddsDispatcher {
           getWriteChunkRequest(dd.getUuidString(), 1L, 1L);
       // send read chunk request and make sure container does not exist
       ContainerCommandResponseProto response =
-          hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest));
+          hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), 
null);
       Assert.assertEquals(response.getResult(),
           ContainerProtos.Result.CONTAINER_NOT_FOUND);
       // send write chunk request without sending create container
-      response = hddsDispatcher.dispatch(writeChunkRequest);
+      response = hddsDispatcher.dispatch(writeChunkRequest, null);
       // container should be created as part of write chunk request
       Assert.assertEquals(ContainerProtos.Result.SUCCESS, 
response.getResult());
       // send read chunk request to read the chunk written above
       response =
-          hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest));
+          hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), 
null);
       Assert.assertEquals(ContainerProtos.Result.SUCCESS, 
response.getResult());
       Assert.assertEquals(response.getReadChunk().getData(),
           writeChunkRequest.getWriteChunk().getData());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
index ef77204..f17288f 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
@@ -114,7 +115,7 @@ public class TestChunkManagerImpl {
     // As no chunks are written to the volume writeBytes should be 0
     checkWriteIOStats(0, 0);
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
+        ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA);
     // Now a chunk file is being written with Stage WRITE_DATA, so it should
     // create a temporary chunk file.
     assertTrue(chunksPath.listFiles().length == 1);
@@ -131,7 +132,7 @@ public class TestChunkManagerImpl {
     checkWriteIOStats(data.length, 1);
 
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), ContainerProtos.Stage.COMMIT_DATA);
+        ByteBuffer.wrap(data), WriteChunkStage.COMMIT_DATA);
 
     checkWriteIOStats(data.length, 1);
 
@@ -151,7 +152,7 @@ public class TestChunkManagerImpl {
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), 0), 0, randomLength);
       chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-          ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
+          ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA);
       fail("testWriteChunkIncorrectLength failed");
     } catch (StorageContainerException ex) {
       // As we got an exception, writeBytes should be 0.
@@ -172,7 +173,7 @@ public class TestChunkManagerImpl {
     assertTrue(chunksPath.listFiles().length == 0);
     checkWriteIOStats(0, 0);
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
+        ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
     // Now a chunk file is being written with Stage COMBINED_DATA, so it should
     // create a chunk file.
     assertTrue(chunksPath.listFiles().length == 1);
@@ -185,7 +186,7 @@ public class TestChunkManagerImpl {
   public void testReadChunk() throws Exception {
     checkWriteIOStats(0, 0);
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
+        ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
     checkWriteIOStats(data.length, 1);
     checkReadIOStats(0, 0);
     byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
@@ -199,7 +200,7 @@ public class TestChunkManagerImpl {
   public void testDeleteChunk() throws Exception {
     File chunksPath = new File(keyValueContainerData.getChunksPath());
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
+        ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
     assertTrue(chunksPath.listFiles().length == 1);
     chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
     assertTrue(chunksPath.listFiles().length == 0);
@@ -209,7 +210,7 @@ public class TestChunkManagerImpl {
   public void testDeleteChunkUnsupportedRequest() throws Exception {
     try {
       chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-          ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
+          ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
       long randomLength = 200L;
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), 0), 0, randomLength);
@@ -241,7 +242,7 @@ public class TestChunkManagerImpl {
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), i), 0, data.length);
       chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-          ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
+          ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
     }
     checkWriteIOStats(data.length*100, 100);
     assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 29d74c2..a2e7f50 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -84,10 +85,10 @@ public class TestKeyValueHandler {
     handler = Mockito.mock(KeyValueHandler.class);
     dispatcher = Mockito.mock(HddsDispatcher.class);
     Mockito.when(dispatcher.getHandler(any())).thenReturn(handler);
-    Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod();
+    Mockito.when(dispatcher.dispatch(any(), any())).thenCallRealMethod();
     Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
         Mockito.mock(KeyValueContainer.class));
-    Mockito.when(handler.handle(any(), any())).thenCallRealMethod();
+    Mockito.when(handler.handle(any(), any(), any())).thenCallRealMethod();
     doCallRealMethod().when(dispatcher).setMetricsForTesting(any());
     dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class));
     Mockito.when(dispatcher.buildAuditMessageForFailure(any(), any(), any()))
@@ -111,112 +112,113 @@ public class TestKeyValueHandler {
             .setCreateContainer(ContainerProtos.CreateContainerRequestProto
                 .getDefaultInstance())
             .build();
-    dispatcher.dispatch(createContainerRequest);
+    DispatcherContext context = new DispatcherContext.Builder().build();
+    dispatcher.dispatch(createContainerRequest, context);
     Mockito.verify(handler, times(1)).handleCreateContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Read Container Request handling
     ContainerCommandRequestProto readContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer);
-    dispatcher.dispatch(readContainerRequest);
+    dispatcher.dispatch(readContainerRequest, context);
     Mockito.verify(handler, times(1)).handleReadContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Update Container Request handling
     ContainerCommandRequestProto updateContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.UpdateContainer);
-    dispatcher.dispatch(updateContainerRequest);
+    dispatcher.dispatch(updateContainerRequest, context);
     Mockito.verify(handler, times(1)).handleUpdateContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Delete Container Request handling
     ContainerCommandRequestProto deleteContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.DeleteContainer);
-    dispatcher.dispatch(deleteContainerRequest);
+    dispatcher.dispatch(deleteContainerRequest, null);
     Mockito.verify(handler, times(1)).handleDeleteContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test List Container Request handling
     ContainerCommandRequestProto listContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ListContainer);
-    dispatcher.dispatch(listContainerRequest);
+    dispatcher.dispatch(listContainerRequest, context);
     Mockito.verify(handler, times(1)).handleUnsupportedOp(
         any(ContainerCommandRequestProto.class));
 
     // Test Close Container Request handling
     ContainerCommandRequestProto closeContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.CloseContainer);
-    dispatcher.dispatch(closeContainerRequest);
+    dispatcher.dispatch(closeContainerRequest, context);
     Mockito.verify(handler, times(1)).handleCloseContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Put Block Request handling
     ContainerCommandRequestProto putBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.PutBlock);
-    dispatcher.dispatch(putBlockRequest);
+    dispatcher.dispatch(putBlockRequest, context);
     Mockito.verify(handler, times(1)).handlePutBlock(
-        any(ContainerCommandRequestProto.class), any());
+        any(ContainerCommandRequestProto.class), any(), any());
 
     // Test Get Block Request handling
     ContainerCommandRequestProto getBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.GetBlock);
-    dispatcher.dispatch(getBlockRequest);
+    dispatcher.dispatch(getBlockRequest, context);
     Mockito.verify(handler, times(1)).handleGetBlock(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Delete Block Request handling
     ContainerCommandRequestProto deleteBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.DeleteBlock);
-    dispatcher.dispatch(deleteBlockRequest);
+    dispatcher.dispatch(deleteBlockRequest, context);
     Mockito.verify(handler, times(1)).handleDeleteBlock(
         any(ContainerCommandRequestProto.class), any());
 
     // Test List Block Request handling
     ContainerCommandRequestProto listBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ListBlock);
-    dispatcher.dispatch(listBlockRequest);
+    dispatcher.dispatch(listBlockRequest, context);
     Mockito.verify(handler, times(2)).handleUnsupportedOp(
         any(ContainerCommandRequestProto.class));
 
     // Test Read Chunk Request handling
     ContainerCommandRequestProto readChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ReadChunk);
-    dispatcher.dispatch(readChunkRequest);
+    dispatcher.dispatch(readChunkRequest, context);
     Mockito.verify(handler, times(1)).handleReadChunk(
-        any(ContainerCommandRequestProto.class), any());
+        any(ContainerCommandRequestProto.class), any(), any());
 
     // Test Delete Chunk Request handling
     ContainerCommandRequestProto deleteChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.DeleteChunk);
-    dispatcher.dispatch(deleteChunkRequest);
+    dispatcher.dispatch(deleteChunkRequest, context);
     Mockito.verify(handler, times(1)).handleDeleteChunk(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Write Chunk Request handling
     ContainerCommandRequestProto writeChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.WriteChunk);
-    dispatcher.dispatch(writeChunkRequest);
+    dispatcher.dispatch(writeChunkRequest, context);
     Mockito.verify(handler, times(1)).handleWriteChunk(
-        any(ContainerCommandRequestProto.class), any());
+        any(ContainerCommandRequestProto.class), any(), any());
 
     // Test List Chunk Request handling
     ContainerCommandRequestProto listChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ListChunk);
-    dispatcher.dispatch(listChunkRequest);
+    dispatcher.dispatch(listChunkRequest, context);
     Mockito.verify(handler, times(3)).handleUnsupportedOp(
         any(ContainerCommandRequestProto.class));
 
     // Test Put Small File Request handling
     ContainerCommandRequestProto putSmallFileRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.PutSmallFile);
-    dispatcher.dispatch(putSmallFileRequest);
+    dispatcher.dispatch(putSmallFileRequest, context);
     Mockito.verify(handler, times(1)).handlePutSmallFile(
-        any(ContainerCommandRequestProto.class), any());
+        any(ContainerCommandRequestProto.class), any(), any());
 
     // Test Get Small File Request handling
     ContainerCommandRequestProto getSmallFileRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.GetSmallFile);
-    dispatcher.dispatch(getSmallFileRequest);
+    dispatcher.dispatch(getSmallFileRequest, context);
     Mockito.verify(handler, times(1)).handleGetSmallFile(
         any(ContainerCommandRequestProto.class), any());
   }
@@ -294,7 +296,7 @@ public class TestKeyValueHandler {
             .setCloseContainer(ContainerProtos.CloseContainerRequestProto
                 .getDefaultInstance())
             .build();
-    dispatcher.dispatch(closeContainerRequest);
+    dispatcher.dispatch(closeContainerRequest, null);
 
     Mockito.when(handler.handleCloseContainer(any(), any()))
         .thenCallRealMethod();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 5d19a10..1637ff7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -78,7 +78,7 @@ import java.util.UUID;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage.COMBINED;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
 import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
@@ -334,7 +334,7 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        COMBINED);
+        WriteChunkStage.COMBINED);
     return info;
 
   }
@@ -375,7 +375,7 @@ public class TestContainerPersistence {
       byte[] data = getData(datalen);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-          COMBINED);
+          WriteChunkStage.COMBINED);
       String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
       fileHashMap.put(fileName, info);
     }
@@ -433,7 +433,7 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        COMBINED);
+        WriteChunkStage.COMBINED);
 
     byte[] readData = chunkManager.readChunk(container, blockID, info, false);
     assertTrue(Arrays.equals(data, readData));
@@ -466,13 +466,13 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        COMBINED);
+        WriteChunkStage.COMBINED);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        COMBINED);
+        WriteChunkStage.COMBINED);
     // With the overwrite flag it should work now.
     info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        COMBINED);
+        WriteChunkStage.COMBINED);
     long bytesUsed = container.getContainerData().getBytesUsed();
     Assert.assertEquals(datalen, bytesUsed);
 
@@ -507,7 +507,7 @@ public class TestContainerPersistence {
       oldSha.update(data);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-          COMBINED);
+          WriteChunkStage.COMBINED);
     }
 
     // Request to read the whole data in a single go.
@@ -540,7 +540,7 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        COMBINED);
+        WriteChunkStage.COMBINED);
     chunkManager.deleteChunk(container, blockID, info);
     exception.expect(StorageContainerException.class);
     exception.expectMessage("Unable to find the chunk file.");
@@ -655,7 +655,7 @@ public class TestContainerPersistence {
       byte[] data = getData(datalen);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-          COMBINED);
+          WriteChunkStage.COMBINED);
       totalSize += datalen;
       chunkList.add(info);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
index ab2ddf0..43606f0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -169,7 +169,8 @@ public class TestCSMMetrics {
      */
     @Override
     public ContainerCommandResponseProto dispatch(
-        ContainerCommandRequestProto msg) {
+        ContainerCommandRequestProto msg,
+        DispatcherContext context) {
       return ContainerTestHelper.getCreateContainerResponse(msg);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 3e98594..1675484 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
@@ -236,8 +237,9 @@ public class TestContainerServer {
      * @return Command Response
      */
     @Override
-    public ContainerCommandResponseProto
-        dispatch(ContainerCommandRequestProto msg)  {
+    public ContainerCommandResponseProto dispatch(
+        ContainerCommandRequestProto msg,
+        DispatcherContext context) {
       return ContainerTestHelper.getCreateContainerResponse(msg);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
index 01b51fa..91d0968 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
@@ -130,7 +130,7 @@ public class BenchMarkDatanodeDispatcher {
     for (int x = 0; x < INIT_CONTAINERS; x++) {
       long containerID = HddsUtils.getUtcTime() + x;
       ContainerCommandRequestProto req = 
getCreateContainerCommand(containerID);
-      dispatcher.dispatch(req);
+      dispatcher.dispatch(req, null);
       containers.add(containerID);
       containerCount.getAndIncrement();
     }
@@ -153,8 +153,8 @@ public class BenchMarkDatanodeDispatcher {
         long containerID = containers.get(y);
         BlockID  blockID = new BlockID(containerID, key);
         dispatcher
-            .dispatch(getPutBlockCommand(blockID, chunkName));
-        dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName));
+            .dispatch(getPutBlockCommand(blockID, chunkName), null);
+        dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName), null);
       }
     }
   }
@@ -268,7 +268,7 @@ public class BenchMarkDatanodeDispatcher {
   public void createContainer(BenchMarkDatanodeDispatcher bmdd) {
     long containerID = RandomUtils.nextLong();
     ContainerCommandRequestProto req = getCreateContainerCommand(containerID);
-    bmdd.dispatcher.dispatch(req);
+    bmdd.dispatcher.dispatch(req, null);
     bmdd.containers.add(containerID);
     bmdd.containerCount.getAndIncrement();
   }
@@ -277,27 +277,27 @@ public class BenchMarkDatanodeDispatcher {
   @Benchmark
   public void writeChunk(BenchMarkDatanodeDispatcher bmdd) {
     bmdd.dispatcher.dispatch(getWriteChunkCommand(
-        getRandomBlockID(), getNewChunkToWrite()));
+        getRandomBlockID(), getNewChunkToWrite()), null);
   }
 
   @Benchmark
   public void readChunk(BenchMarkDatanodeDispatcher bmdd) {
     BlockID blockID = getRandomBlockID();
     String chunkKey = getRandomChunkToRead();
-    bmdd.dispatcher.dispatch(getReadChunkCommand(blockID, chunkKey));
+    bmdd.dispatcher.dispatch(getReadChunkCommand(blockID, chunkKey), null);
   }
 
   @Benchmark
   public void putBlock(BenchMarkDatanodeDispatcher bmdd) {
     BlockID blockID = getRandomBlockID();
     String chunkKey = getNewChunkToWrite();
-    bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey));
+    bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey), null);
   }
 
   @Benchmark
   public void getBlock(BenchMarkDatanodeDispatcher bmdd) {
     BlockID blockID = getRandomBlockID();
-    bmdd.dispatcher.dispatch(getGetBlockCommand(blockID));
+    bmdd.dispatcher.dispatch(getGetBlockCommand(blockID), null);
   }
 
   // Chunks writes from benchmark only reaches certain containers


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to