http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
deleted file mode 100644
index 12c196b..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.GeneratedMessage;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineAction;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import org.apache.hadoop.ozone.container.common.states.datanode
-    .InitDatanodeState;
-import org.apache.hadoop.ozone.container.common.states.datanode
-    .RunningDatanodeState;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus
-    .CommandStatusBuilder;
-import org.apache.hadoop.ozone.protocol.commands
-    .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-
-import static java.lang.Math.min;
-import static 
org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.ArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-
-import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
-
-/**
- * Current Context of State Machine.
- */
-public class StateContext {
-  static final Logger LOG =
-      LoggerFactory.getLogger(StateContext.class);
-  private final Queue<SCMCommand> commandQueue;
-  private final Map<Long, CommandStatus> cmdStatusMap;
-  private final Lock lock;
-  private final DatanodeStateMachine parent;
-  private final AtomicLong stateExecutionCount;
-  private final Configuration conf;
-  private final List<GeneratedMessage> reports;
-  private final Queue<ContainerAction> containerActions;
-  private final Queue<PipelineAction> pipelineActions;
-  private DatanodeStateMachine.DatanodeStates state;
-
-  /**
-   * Starting with a 2 sec heartbeat frequency which will be updated to the
-   * real HB frequency after scm registration. With this method the
-   * initial registration could be significant faster.
-   */
-  private AtomicLong heartbeatFrequency = new AtomicLong(2000);
-
-  /**
-   * Constructs a StateContext.
-   *
-   * @param conf   - Configration
-   * @param state  - State
-   * @param parent Parent State Machine
-   */
-  public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
-      state, DatanodeStateMachine parent) {
-    this.conf = conf;
-    this.state = state;
-    this.parent = parent;
-    commandQueue = new LinkedList<>();
-    cmdStatusMap = new ConcurrentHashMap<>();
-    reports = new LinkedList<>();
-    containerActions = new LinkedList<>();
-    pipelineActions = new LinkedList<>();
-    lock = new ReentrantLock();
-    stateExecutionCount = new AtomicLong(0);
-  }
-
-  /**
-   * Returns the ContainerStateMachine class that holds this state.
-   *
-   * @return ContainerStateMachine.
-   */
-  public DatanodeStateMachine getParent() {
-    return parent;
-  }
-
-  /**
-   * Get the container server port.
-   * @return The container server port if available, return -1 if otherwise
-   */
-  public int getContainerPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getContainerServerPort();
-  }
-
-  /**
-   * Gets the Ratis Port.
-   * @return int , return -1 if not valid.
-   */
-  public int getRatisPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
-  }
-
-  /**
-   * Returns true if we are entering a new state.
-   *
-   * @return boolean
-   */
-  boolean isEntering() {
-    return stateExecutionCount.get() == 0;
-  }
-
-  /**
-   * Returns true if we are exiting from the current state.
-   *
-   * @param newState - newState.
-   * @return boolean
-   */
-  boolean isExiting(DatanodeStateMachine.DatanodeStates newState) {
-    boolean isExiting = state != newState && stateExecutionCount.get() > 0;
-    if(isExiting) {
-      stateExecutionCount.set(0);
-    }
-    return isExiting;
-  }
-
-  /**
-   * Returns the current state the machine is in.
-   *
-   * @return state.
-   */
-  public DatanodeStateMachine.DatanodeStates getState() {
-    return state;
-  }
-
-  /**
-   * Sets the current state of the machine.
-   *
-   * @param state state.
-   */
-  public void setState(DatanodeStateMachine.DatanodeStates state) {
-    this.state = state;
-  }
-
-  /**
-   * Adds the report to report queue.
-   *
-   * @param report report to be added
-   */
-  public void addReport(GeneratedMessage report) {
-    if (report != null) {
-      synchronized (reports) {
-        reports.add(report);
-      }
-    }
-  }
-
-  /**
-   * Adds the reports which could not be sent by heartbeat back to the
-   * reports list.
-   *
-   * @param reportsToPutBack list of reports which failed to be sent by
-   *                         heartbeat.
-   */
-  public void putBackReports(List<GeneratedMessage> reportsToPutBack) {
-    synchronized (reports) {
-      reports.addAll(0, reportsToPutBack);
-    }
-  }
-
-  /**
-   * Returns all the available reports from the report queue, or empty list if
-   * the queue is empty.
-   *
-   * @return List<reports>
-   */
-  public List<GeneratedMessage> getAllAvailableReports() {
-    return getReports(Integer.MAX_VALUE);
-  }
-
-  /**
-   * Returns available reports from the report queue with a max limit on
-   * list size, or empty list if the queue is empty.
-   *
-   * @return List<reports>
-   */
-  public List<GeneratedMessage> getReports(int maxLimit) {
-    List<GeneratedMessage> reportsToReturn = new LinkedList<>();
-    synchronized (reports) {
-      List<GeneratedMessage> tempList = reports.subList(
-          0, min(reports.size(), maxLimit));
-      reportsToReturn.addAll(tempList);
-      tempList.clear();
-    }
-    return reportsToReturn;
-  }
-
-
-  /**
-   * Adds the ContainerAction to ContainerAction queue.
-   *
-   * @param containerAction ContainerAction to be added
-   */
-  public void addContainerAction(ContainerAction containerAction) {
-    synchronized (containerActions) {
-      containerActions.add(containerAction);
-    }
-  }
-
-  /**
-   * Add ContainerAction to ContainerAction queue if it's not present.
-   *
-   * @param containerAction ContainerAction to be added
-   */
-  public void addContainerActionIfAbsent(ContainerAction containerAction) {
-    synchronized (containerActions) {
-      if (!containerActions.contains(containerAction)) {
-        containerActions.add(containerAction);
-      }
-    }
-  }
-
-  /**
-   * Returns all the pending ContainerActions from the ContainerAction queue,
-   * or empty list if the queue is empty.
-   *
-   * @return List<ContainerAction>
-   */
-  public List<ContainerAction> getAllPendingContainerActions() {
-    return getPendingContainerAction(Integer.MAX_VALUE);
-  }
-
-  /**
-   * Returns pending ContainerActions from the ContainerAction queue with a
-   * max limit on list size, or empty list if the queue is empty.
-   *
-   * @return List<ContainerAction>
-   */
-  public List<ContainerAction> getPendingContainerAction(int maxLimit) {
-    List<ContainerAction> containerActionList = new ArrayList<>();
-    synchronized (containerActions) {
-      if (!containerActions.isEmpty()) {
-        int size = containerActions.size();
-        int limit = size > maxLimit ? maxLimit : size;
-        for (int count = 0; count < limit; count++) {
-          // we need to remove the action from the containerAction queue
-          // as well
-          ContainerAction action = containerActions.poll();
-          Preconditions.checkNotNull(action);
-          containerActionList.add(action);
-        }
-      }
-      return containerActionList;
-    }
-  }
-
-  /**
-   * Add PipelineAction to PipelineAction queue if it's not present.
-   *
-   * @param pipelineAction PipelineAction to be added
-   */
-  public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
-    synchronized (pipelineActions) {
-      /**
-       * If pipelineAction queue already contains entry for the pipeline id
-       * with same action, we should just return.
-       * Note: We should not use pipelineActions.contains(pipelineAction) here
-       * as, pipelineAction has a msg string. So even if two msgs differ though
-       * action remains same on the given pipeline, it will end up adding it
-       * multiple times here.
-       */
-      for (PipelineAction pipelineActionIter : pipelineActions) {
-        if (pipelineActionIter.getAction() == pipelineAction.getAction()
-            && pipelineActionIter.hasClosePipeline() && pipelineAction
-            .hasClosePipeline()
-            && pipelineActionIter.getClosePipeline().getPipelineID()
-            .equals(pipelineAction.getClosePipeline().getPipelineID())) {
-          return;
-        }
-      }
-      pipelineActions.add(pipelineAction);
-    }
-  }
-
-  /**
-   * Returns pending PipelineActions from the PipelineAction queue with a
-   * max limit on list size, or empty list if the queue is empty.
-   *
-   * @return List<ContainerAction>
-   */
-  public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
-    List<PipelineAction> pipelineActionList = new ArrayList<>();
-    synchronized (pipelineActions) {
-      if (!pipelineActions.isEmpty()) {
-        int size = pipelineActions.size();
-        int limit = size > maxLimit ? maxLimit : size;
-        for (int count = 0; count < limit; count++) {
-          pipelineActionList.add(pipelineActions.poll());
-        }
-      }
-      return pipelineActionList;
-    }
-  }
-
-  /**
-   * Returns the next task to get executed by the datanode state machine.
-   * @return A callable that will be executed by the
-   * {@link DatanodeStateMachine}
-   */
-  @SuppressWarnings("unchecked")
-  public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() {
-    switch (this.state) {
-    case INIT:
-      return new InitDatanodeState(this.conf, parent.getConnectionManager(),
-          this);
-    case RUNNING:
-      return new RunningDatanodeState(this.conf, parent.getConnectionManager(),
-          this);
-    case SHUTDOWN:
-      return null;
-    default:
-      throw new IllegalArgumentException("Not Implemented yet.");
-    }
-  }
-
-  /**
-   * Executes the required state function.
-   *
-   * @param service - Executor Service
-   * @param time    - seconds to wait
-   * @param unit    - Seconds.
-   * @throws InterruptedException
-   * @throws ExecutionException
-   * @throws TimeoutException
-   */
-  public void execute(ExecutorService service, long time, TimeUnit unit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    stateExecutionCount.incrementAndGet();
-    DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask();
-    if (this.isEntering()) {
-      task.onEnter();
-    }
-    task.execute(service);
-    DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
-    if (this.state != newState) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Task {} executed, state transited from {} to {}",
-            task.getClass().getSimpleName(), this.state, newState);
-      }
-      if (isExiting(newState)) {
-        task.onExit();
-      }
-      this.setState(newState);
-    }
-  }
-
-  /**
-   * Returns the next command or null if it is empty.
-   *
-   * @return SCMCommand or Null.
-   */
-  public SCMCommand getNextCommand() {
-    lock.lock();
-    try {
-      return commandQueue.poll();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Adds a command to the State Machine queue.
-   *
-   * @param command - SCMCommand.
-   */
-  public void addCommand(SCMCommand command) {
-    lock.lock();
-    try {
-      commandQueue.add(command);
-    } finally {
-      lock.unlock();
-    }
-    this.addCmdStatus(command);
-  }
-
-  /**
-   * Returns the count of the Execution.
-   * @return long
-   */
-  public long getExecutionCount() {
-    return stateExecutionCount.get();
-  }
-
-  /**
-   * Returns the next {@link CommandStatus} or null if it is empty.
-   *
-   * @return {@link CommandStatus} or Null.
-   */
-  public CommandStatus getCmdStatus(Long key) {
-    return cmdStatusMap.get(key);
-  }
-
-  /**
-   * Adds a {@link CommandStatus} to the State Machine.
-   *
-   * @param status - {@link CommandStatus}.
-   */
-  public void addCmdStatus(Long key, CommandStatus status) {
-    cmdStatusMap.put(key, status);
-  }
-
-  /**
-   * Adds a {@link CommandStatus} to the State Machine for given SCMCommand.
-   *
-   * @param cmd - {@link SCMCommand}.
-   */
-  public void addCmdStatus(SCMCommand cmd) {
-    CommandStatusBuilder statusBuilder;
-    if (cmd.getType() == Type.deleteBlocksCommand) {
-      statusBuilder = new DeleteBlockCommandStatusBuilder();
-    } else {
-      statusBuilder = CommandStatusBuilder.newBuilder();
-    }
-    this.addCmdStatus(cmd.getId(),
-        statusBuilder.setCmdId(cmd.getId())
-            .setStatus(Status.PENDING)
-            .setType(cmd.getType())
-            .build());
-  }
-
-  /**
-   * Get map holding all {@link CommandStatus} objects.
-   *
-   */
-  public Map<Long, CommandStatus> getCommandStatusMap() {
-    return cmdStatusMap;
-  }
-
-  /**
-   * Remove object from cache in StateContext#cmdStatusMap.
-   *
-   */
-  public void removeCommandStatus(Long cmdId) {
-    cmdStatusMap.remove(cmdId);
-  }
-
-  /**
-   * Updates status of a pending status command.
-   * @param cmdId       command id
-   * @param cmdStatusUpdater Consumer to update command status.
-   * @return true if command status updated successfully else false.
-   */
-  public boolean updateCommandStatus(Long cmdId,
-      Consumer<CommandStatus> cmdStatusUpdater) {
-    if(cmdStatusMap.containsKey(cmdId)) {
-      cmdStatusUpdater.accept(cmdStatusMap.get(cmdId));
-      return true;
-    }
-    return false;
-  }
-
-  public void configureHeartbeatFrequency(){
-    heartbeatFrequency.set(getScmHeartbeatInterval(conf));
-  }
-
-  /**
-   * Return current heartbeat frequency in ms.
-   */
-  public long getHeartbeatFrequency() {
-    return heartbeatFrequency.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
deleted file mode 100644
index 2c3db61..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.apache.ratis.protocol.NotLeaderException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-
-/**
- * Handler for close container command received from SCM.
- */
-public class CloseContainerCommandHandler implements CommandHandler {
-  static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerCommandHandler.class);
-  private int invocationCount;
-  private long totalTime;
-  private boolean cmdExecuted;
-
-  /**
-   * Constructs a ContainerReport handler.
-   */
-  public CloseContainerCommandHandler() {
-  }
-
-  /**
-   * Handles a given SCM command.
-   *
-   * @param command           - SCM Command
-   * @param container         - Ozone Container.
-   * @param context           - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Close Container command.");
-    invocationCount++;
-    cmdExecuted = false;
-    long startTime = Time.monotonicNow();
-    // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
-    long containerID = -1;
-    try {
-
-      CloseContainerCommandProto
-          closeContainerProto =
-          CloseContainerCommandProto
-              .parseFrom(command.getProtoBufMessage());
-      containerID = closeContainerProto.getContainerID();
-      if (container.getContainerSet().getContainer(containerID)
-          .getContainerData().isClosed()) {
-        LOG.debug("Container {} is already closed", containerID);
-        // It might happen that the where the first attempt of closing the
-        // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will
-        // retry to check the container got really closed via Ratis.
-        // In such cases of the retry attempt, if the container is already
-        // closed via Ratis, we should just return.
-        cmdExecuted = true;
-        return;
-      }
-      HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
-      HddsProtos.ReplicationType replicationType =
-          closeContainerProto.getReplicationType();
-
-      ContainerProtos.ContainerCommandRequestProto.Builder request =
-          ContainerProtos.ContainerCommandRequestProto.newBuilder();
-      request.setCmdType(ContainerProtos.Type.CloseContainer);
-      request.setContainerID(containerID);
-      request.setCloseContainer(
-          ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
-      request.setTraceID(UUID.randomUUID().toString());
-      request.setDatanodeUuid(
-          context.getParent().getDatanodeDetails().getUuidString());
-      // submit the close container request for the XceiverServer to handle
-      container.submitContainerRequest(
-          request.build(), replicationType, pipelineID);
-    } catch (Exception e) {
-      if (e instanceof NotLeaderException) {
-        // If the particular datanode is not the Ratis leader, the close
-        // container command will not be executed by the follower but will be
-        // executed by Ratis stateMachine transactions via leader to follower.
-        // There can also be case where the datanode is in candidate state.
-        // In these situations, NotLeaderException is thrown. Remove the status
-        // from cmdStatus Map here so that it will be retried only by SCM if 
the
-        // leader could not not close the container after a certain time.
-        context.removeCommandStatus(containerID);
-        LOG.info(e.getLocalizedMessage());
-      } else {
-        LOG.error("Can't close container " + containerID, e);
-        cmdExecuted = false;
-      }
-    } finally {
-      updateCommandStatus(context, command,
-          (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG);
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
-  }
-
-  /**
-   * Returns the command type that this command handler handles.
-   *
-   * @return Type
-   */
-  @Override
-  public SCMCommandProto.Type getCommandType() {
-    return SCMCommandProto.Type.closeContainerCommand;
-  }
-
-  /**
-   * Returns number of times this handler has been invoked.
-   *
-   * @return int
-   */
-  @Override
-  public int getInvocationCount() {
-    return invocationCount;
-  }
-
-  /**
-   * Returns the average time this function takes to run.
-   *
-   * @return long
-   */
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
deleted file mode 100644
index 5163d98..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Dispatches command to the correct handler.
- */
-public final class CommandDispatcher {
-  static final Logger LOG =
-      LoggerFactory.getLogger(CommandDispatcher.class);
-  private final StateContext context;
-  private final Map<Type, CommandHandler> handlerMap;
-  private final OzoneContainer container;
-  private final SCMConnectionManager connectionManager;
-
-  /**
-   * Constructs a command Dispatcher.
-   * @param context - Context.
-   */
-  /**
-   * Constructs a command dispatcher.
-   *
-   * @param container - Ozone Container
-   * @param context - Context
-   * @param handlers - Set of handlers.
-   */
-  private CommandDispatcher(OzoneContainer container, SCMConnectionManager
-      connectionManager, StateContext context,
-      CommandHandler... handlers) {
-    Preconditions.checkNotNull(context);
-    Preconditions.checkNotNull(handlers);
-    Preconditions.checkArgument(handlers.length > 0);
-    Preconditions.checkNotNull(container);
-    Preconditions.checkNotNull(connectionManager);
-    this.context = context;
-    this.container = container;
-    this.connectionManager = connectionManager;
-    handlerMap = new HashMap<>();
-    for (CommandHandler h : handlers) {
-      if(handlerMap.containsKey(h.getCommandType())){
-        LOG.error("Duplicate handler for the same command. Exiting. Handle " +
-            "key : { }", h.getCommandType().getDescriptorForType().getName());
-        throw new IllegalArgumentException("Duplicate handler for the same " +
-            "command.");
-      }
-      handlerMap.put(h.getCommandType(), h);
-    }
-  }
-
-  public CommandHandler getCloseContainerHandler() {
-    return handlerMap.get(Type.closeContainerCommand);
-  }
-
-  /**
-   * Dispatch the command to the correct handler.
-   *
-   * @param command - SCM Command.
-   */
-  public void handle(SCMCommand command) {
-    Preconditions.checkNotNull(command);
-    CommandHandler handler = handlerMap.get(command.getType());
-    if (handler != null) {
-      handler.handle(command, container, context, connectionManager);
-    } else {
-      LOG.error("Unknown SCM Command queued. There is no handler for this " +
-          "command. Command: {}", command.getType().getDescriptorForType()
-          .getName());
-    }
-  }
-
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Helper class to construct command dispatcher.
-   */
-  public static class Builder {
-    private final List<CommandHandler> handlerList;
-    private OzoneContainer container;
-    private StateContext context;
-    private SCMConnectionManager connectionManager;
-
-    public Builder() {
-      handlerList = new LinkedList<>();
-    }
-
-    /**
-     * Adds a handler.
-     *
-     * @param handler - handler
-     * @return Builder
-     */
-    public Builder addHandler(CommandHandler handler) {
-      Preconditions.checkNotNull(handler);
-      handlerList.add(handler);
-      return this;
-    }
-
-    /**
-     * Add the OzoneContainer.
-     *
-     * @param ozoneContainer - ozone container.
-     * @return Builder
-     */
-    public Builder setContainer(OzoneContainer ozoneContainer) {
-      Preconditions.checkNotNull(ozoneContainer);
-      this.container = ozoneContainer;
-      return this;
-    }
-
-    /**
-     * Set the Connection Manager.
-     *
-     * @param scmConnectionManager
-     * @return this
-     */
-    public Builder setConnectionManager(SCMConnectionManager
-        scmConnectionManager) {
-      Preconditions.checkNotNull(scmConnectionManager);
-      this.connectionManager = scmConnectionManager;
-      return this;
-    }
-
-    /**
-     * Sets the Context.
-     *
-     * @param stateContext - StateContext
-     * @return this
-     */
-    public Builder setContext(StateContext stateContext) {
-      Preconditions.checkNotNull(stateContext);
-      this.context = stateContext;
-      return this;
-    }
-
-    /**
-     * Builds a command Dispatcher.
-     * @return Command Dispatcher.
-     */
-    public CommandDispatcher build() {
-      Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
-          " manager.");
-      Preconditions.checkNotNull(this.container, "Missing container.");
-      Preconditions.checkNotNull(this.context, "Missing context.");
-      Preconditions.checkArgument(this.handlerList.size() > 0);
-      return new CommandDispatcher(this.container, this.connectionManager,
-          this.context, handlerList.toArray(
-              new CommandHandler[handlerList.size()]));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
deleted file mode 100644
index 1ea0ea8..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.slf4j.Logger;
-
-import java.util.function.Consumer;
-
-/**
- * Generic interface for handlers.
- */
-public interface CommandHandler {
-
-  /**
-   * Handles a given SCM command.
-   * @param command - SCM Command
-   * @param container - Ozone Container.
-   * @param context - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager);
-
-  /**
-   * Returns the command type that this command handler handles.
-   * @return Type
-   */
-  SCMCommandProto.Type getCommandType();
-
-  /**
-   * Returns number of times this handler has been invoked.
-   * @return int
-   */
-  int getInvocationCount();
-
-  /**
-   * Returns the average time this function takes to run.
-   * @return  long
-   */
-  long getAverageRunTime();
-
-  /**
-   * Default implementation for updating command status.
-   */
-  default void updateCommandStatus(StateContext context, SCMCommand command,
-      Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
-    if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
-      log.debug("{} with Id:{} not found.", command.getType(),
-          command.getId());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
deleted file mode 100644
index aa63fb4..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
-    .DeleteBlockTransactionResult;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers
-    .DeletedContainerBlocksSummary;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.function.Consumer;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_NOT_FOUND;
-
-/**
- * Handle block deletion commands.
- */
-public class DeleteBlocksCommandHandler implements CommandHandler {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
-
-  private final ContainerSet containerSet;
-  private final Configuration conf;
-  private int invocationCount;
-  private long totalTime;
-  private boolean cmdExecuted;
-
-  public DeleteBlocksCommandHandler(ContainerSet cset,
-      Configuration conf) {
-    this.containerSet = cset;
-    this.conf = conf;
-  }
-
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    cmdExecuted = false;
-    long startTime = Time.monotonicNow();
-    ContainerBlocksDeletionACKProto blockDeletionACK = null;
-    try {
-      if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
-        LOG.warn("Skipping handling command, expected command "
-                + "type {} but found {}",
-            SCMCommandProto.Type.deleteBlocksCommand, command.getType());
-        return;
-      }
-      LOG.debug("Processing block deletion command.");
-      invocationCount++;
-
-      // move blocks to deleting state.
-      // this is a metadata update, the actual deletion happens in another
-      // recycling thread.
-      DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
-      List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
-
-      DeletedContainerBlocksSummary summary =
-          DeletedContainerBlocksSummary.getFrom(containerBlocks);
-      LOG.info("Start to delete container blocks, TXIDs={}, "
-              + "numOfContainers={}, numOfBlocks={}",
-          summary.getTxIDSummary(),
-          summary.getNumOfContainers(),
-          summary.getNumOfBlocks());
-
-      ContainerBlocksDeletionACKProto.Builder resultBuilder =
-          ContainerBlocksDeletionACKProto.newBuilder();
-      containerBlocks.forEach(entry -> {
-        DeleteBlockTransactionResult.Builder txResultBuilder =
-            DeleteBlockTransactionResult.newBuilder();
-        txResultBuilder.setTxID(entry.getTxID());
-        long containerId = entry.getContainerID();
-        try {
-          Container cont = containerSet.getContainer(containerId);
-          if (cont == null) {
-            throw new StorageContainerException("Unable to find the container "
-                + containerId, CONTAINER_NOT_FOUND);
-          }
-          ContainerProtos.ContainerType containerType = 
cont.getContainerType();
-          switch (containerType) {
-          case KeyValueContainer:
-            KeyValueContainerData containerData = (KeyValueContainerData)
-                cont.getContainerData();
-            deleteKeyValueContainerBlocks(containerData, entry);
-            txResultBuilder.setContainerID(containerId)
-                .setSuccess(true);
-            break;
-          default:
-            LOG.error(
-                "Delete Blocks Command Handler is not implemented for " +
-                    "containerType {}", containerType);
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed to delete blocks for container={}, TXID={}",
-              entry.getContainerID(), entry.getTxID(), e);
-          txResultBuilder.setContainerID(containerId)
-              .setSuccess(false);
-        }
-        resultBuilder.addResults(txResultBuilder.build())
-            .setDnId(context.getParent().getDatanodeDetails()
-                .getUuid().toString());
-      });
-      blockDeletionACK = resultBuilder.build();
-
-      // Send ACK back to SCM as long as meta updated
-      // TODO Or we should wait until the blocks are actually deleted?
-      if (!containerBlocks.isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Sending following block deletion ACK to SCM");
-          for (DeleteBlockTransactionResult result : blockDeletionACK
-              .getResultsList()) {
-            LOG.debug(result.getTxID() + " : " + result.getSuccess());
-          }
-        }
-      }
-      cmdExecuted = true;
-    } finally {
-      final ContainerBlocksDeletionACKProto deleteAck =
-          blockDeletionACK;
-      Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
-        cmdStatus.setStatus(cmdExecuted);
-        ((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck);
-      };
-      updateCommandStatus(context, command, statusUpdater, LOG);
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
-  }
-
-  /**
-   * Move a bunch of blocks from a container to deleting state. This is a meta
-   * update, the actual deletes happen in async mode.
-   *
-   * @param containerData - KeyValueContainerData
-   * @param delTX a block deletion transaction.
-   * @throws IOException if I/O error occurs.
-   */
-  private void deleteKeyValueContainerBlocks(
-      KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
-      throws IOException {
-    long containerId = delTX.getContainerID();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing Container : {}, DB path : {}", containerId,
-          containerData.getMetadataPath());
-    }
-
-    if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
-      LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
-              + " Outdated delete transactionId %d < %d", containerId,
-          delTX.getTxID(), containerData.getDeleteTransactionId()));
-      return;
-    }
-
-    int newDeletionBlocks = 0;
-    MetadataStore containerDB = BlockUtils.getDB(containerData, conf);
-    for (Long blk : delTX.getLocalIDList()) {
-      BatchOperation batch = new BatchOperation();
-      byte[] blkBytes = Longs.toByteArray(blk);
-      byte[] blkInfo = containerDB.get(blkBytes);
-      if (blkInfo != null) {
-        byte[] deletingKeyBytes =
-            DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk);
-        byte[] deletedKeyBytes =
-            DFSUtil.string2Bytes(OzoneConsts.DELETED_KEY_PREFIX + blk);
-        if (containerDB.get(deletingKeyBytes) != null
-            || containerDB.get(deletedKeyBytes) != null) {
-          LOG.debug(String.format(
-              "Ignoring delete for block %d in container %d."
-                  + " Entry already added.", blk, containerId));
-          continue;
-        }
-        // Found the block in container db,
-        // use an atomic update to change its state to deleting.
-        batch.put(deletingKeyBytes, blkInfo);
-        batch.delete(blkBytes);
-        try {
-          containerDB.writeBatch(batch);
-          newDeletionBlocks++;
-          LOG.debug("Transited Block {} to DELETING state in container {}",
-              blk, containerId);
-        } catch (IOException e) {
-          // if some blocks failed to delete, we fail this TX,
-          // without sending this ACK to SCM, SCM will resend the TX
-          // with a certain number of retries.
-          throw new IOException(
-              "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
-        }
-      } else {
-        LOG.debug("Block {} not found or already under deletion in"
-            + " container {}, skip deleting it.", blk, containerId);
-      }
-    }
-
-    containerDB
-        .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX),
-            Longs.toByteArray(delTX.getTxID()));
-    containerData
-        .updateDeleteTransactionId(delTX.getTxID());
-    // update pending deletion blocks count in in-memory container status
-    containerData.incrPendingDeletionBlocks(newDeletionBlocks);
-  }
-
-  @Override
-  public SCMCommandProto.Type getCommandType() {
-    return SCMCommandProto.Type.deleteBlocksCommand;
-  }
-
-  @Override
-  public int getInvocationCount() {
-    return this.invocationCount;
-  }
-
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
deleted file mode 100644
index 81d162d..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
-import org.apache.hadoop.ozone.container.replication.ReplicationTask;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Command handler to copy containers from sources.
- */
-public class ReplicateContainerCommandHandler implements CommandHandler {
-
-  static final Logger LOG =
-      LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
-
-  private int invocationCount;
-
-  private long totalTime;
-
-  private Configuration conf;
-
-  private ReplicationSupervisor supervisor;
-
-  public ReplicateContainerCommandHandler(
-      Configuration conf,
-      ReplicationSupervisor supervisor) {
-    this.conf = conf;
-    this.supervisor = supervisor;
-  }
-
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-
-    ReplicateContainerCommand replicateCommand =
-        (ReplicateContainerCommand) command;
-    try {
-      List<DatanodeDetails> sourceDatanodes =
-          replicateCommand.getSourceDatanodes();
-      long containerID = replicateCommand.getContainerID();
-
-      Preconditions.checkArgument(sourceDatanodes.size() > 0,
-          String.format("Replication command is received for container %d "
-              + "but the size of source datanodes was 0.", containerID));
-
-      ReplicationTask replicationTask =
-          new ReplicationTask(containerID, sourceDatanodes);
-      supervisor.addTask(replicationTask);
-
-    } finally {
-      updateCommandStatus(context, command,
-          (cmdStatus) -> cmdStatus.setStatus(true), LOG);
-    }
-  }
-
-  @Override
-  public SCMCommandProto.Type getCommandType() {
-    return Type.replicateContainerCommand;
-  }
-
-  @Override
-  public int getInvocationCount() {
-    return this.invocationCount;
-  }
-
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
deleted file mode 100644
index 1e9c8dc..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
deleted file mode 100644
index feb2f81..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-/**
-
- State machine class is used by the container to denote various states a
- container can be in and also is used for command processing.
-
- Container has the following states.
-
- Start - > getVersion -> Register -> Running  -> Shutdown
-
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
deleted file mode 100644
index 75142af..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.states;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * State Interface that allows tasks to maintain states.
- */
-public interface DatanodeState<T> {
-  /**
-   * Called before entering this state.
-   */
-  void onEnter();
-
-  /**
-   * Called After exiting this state.
-   */
-  void onExit();
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  void execute(ExecutorService executor);
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param time - Time
-   * @param timeUnit - Unit of time.
-   */
-  T await(long time, TimeUnit timeUnit)
-      throws InterruptedException, ExecutionException, TimeoutException;
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
deleted file mode 100644
index 995f172..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-
-import com.google.common.base.Strings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .DatanodeStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.hadoop.hdds.HddsUtils.getSCMAddresses;
-
-/**
- * Init Datanode State is the task that gets run when we are in Init State.
- */
-public class InitDatanodeState implements DatanodeState,
-    Callable<DatanodeStateMachine.DatanodeStates> {
-  static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class);
-  private final SCMConnectionManager connectionManager;
-  private final Configuration conf;
-  private final StateContext context;
-  private Future<DatanodeStateMachine.DatanodeStates> result;
-
-  /**
-   *  Create InitDatanodeState Task.
-   *
-   * @param conf - Conf
-   * @param connectionManager - Connection Manager
-   * @param context - Current Context
-   */
-  public InitDatanodeState(Configuration conf,
-                           SCMConnectionManager connectionManager,
-                           StateContext context) {
-    this.conf = conf;
-    this.connectionManager = connectionManager;
-    this.context = context;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates call() throws Exception {
-    Collection<InetSocketAddress> addresses = null;
-    try {
-      addresses = getSCMAddresses(conf);
-    } catch (IllegalArgumentException e) {
-      if(!Strings.isNullOrEmpty(e.getMessage())) {
-        LOG.error("Failed to get SCM addresses: " + e.getMessage());
-      }
-      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-    }
-
-    if (addresses == null || addresses.isEmpty()) {
-      LOG.error("Null or empty SCM address list found.");
-      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-    } else {
-      for (InetSocketAddress addr : addresses) {
-        if (addr.isUnresolved()) {
-          LOG.warn("One SCM address ({}) can't (yet?) be resolved. Postpone "
-              + "initialization.", addr);
-
-          //skip any further initialization. DatanodeStateMachine will try it
-          // again after the hb frequency
-          return this.context.getState();
-        }
-      }
-      for (InetSocketAddress addr : addresses) {
-        connectionManager.addSCMServer(addr);
-      }
-    }
-
-    // If datanode ID is set, persist it to the ID file.
-    persistContainerDatanodeDetails();
-
-    return this.context.getState().getNextState();
-  }
-
-  /**
-   * Persist DatanodeDetails to datanode.id file.
-   */
-  private void persistContainerDatanodeDetails() {
-    String dataNodeIDPath = HddsUtils.getDatanodeIdFilePath(conf);
-    if (Strings.isNullOrEmpty(dataNodeIDPath)) {
-      LOG.error("A valid file path is needed for config setting {}",
-          ScmConfigKeys.OZONE_SCM_DATANODE_ID);
-      this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
-      return;
-    }
-    File idPath = new File(dataNodeIDPath);
-    DatanodeDetails datanodeDetails = this.context.getParent()
-        .getDatanodeDetails();
-    if (datanodeDetails != null && !idPath.exists()) {
-      try {
-        ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
-      } catch (IOException ex) {
-        // As writing DatanodeDetails in to datanodeid file failed, which is
-        // a critical thing, so shutting down the state machine.
-        LOG.error("Writing to {} failed {}", dataNodeIDPath, ex.getMessage());
-        this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
-        return;
-      }
-      LOG.info("DatanodeDetails is persisted to {}", dataNodeIDPath);
-    }
-  }
-
-  /**
-   * Called before entering this state.
-   */
-  @Override
-  public void onEnter() {
-    LOG.trace("Entering init container state");
-  }
-
-  /**
-   * Called After exiting this state.
-   */
-  @Override
-  public void onExit() {
-    LOG.trace("Exiting init container state");
-  }
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  @Override
-  public void execute(ExecutorService executor) {
-    result = executor.submit(this);
-  }
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param time     - Time
-   * @param timeUnit - Unit of time.
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates await(long time,
-      TimeUnit timeUnit) throws InterruptedException,
-      ExecutionException, TimeoutException {
-    return result.get(time, timeUnit);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
deleted file mode 100644
index ec2358a..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Class that implements handshake with SCM.
- */
-public class RunningDatanodeState implements DatanodeState {
-  static final Logger
-      LOG = LoggerFactory.getLogger(RunningDatanodeState.class);
-  private final SCMConnectionManager connectionManager;
-  private final Configuration conf;
-  private final StateContext context;
-  private CompletionService<EndpointStateMachine.EndPointStates> ecs;
-
-  public RunningDatanodeState(Configuration conf,
-      SCMConnectionManager connectionManager,
-      StateContext context) {
-    this.connectionManager = connectionManager;
-    this.conf = conf;
-    this.context = context;
-  }
-
-  /**
-   * Called before entering this state.
-   */
-  @Override
-  public void onEnter() {
-    LOG.trace("Entering handshake task.");
-  }
-
-  /**
-   * Called After exiting this state.
-   */
-  @Override
-  public void onExit() {
-    LOG.trace("Exiting handshake task.");
-  }
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  @Override
-  public void execute(ExecutorService executor) {
-    ecs = new ExecutorCompletionService<>(executor);
-    for (EndpointStateMachine endpoint : connectionManager.getValues()) {
-      Callable<EndpointStateMachine.EndPointStates> endpointTask
-          = getEndPointTask(endpoint);
-      ecs.submit(endpointTask);
-    }
-  }
-  //TODO : Cache some of these tasks instead of creating them
-  //all the time.
-  private Callable<EndpointStateMachine.EndPointStates>
-      getEndPointTask(EndpointStateMachine endpoint) {
-    switch (endpoint.getState()) {
-    case GETVERSION:
-      return new VersionEndpointTask(endpoint, conf, context.getParent()
-          .getContainer());
-    case REGISTER:
-      return  RegisterEndpointTask.newBuilder()
-          .setConfig(conf)
-          .setEndpointStateMachine(endpoint)
-          .setContext(context)
-          .setDatanodeDetails(context.getParent().getDatanodeDetails())
-          .setOzoneContainer(context.getParent().getContainer())
-          .build();
-    case HEARTBEAT:
-      return HeartbeatEndpointTask.newBuilder()
-          .setConfig(conf)
-          .setEndpointStateMachine(endpoint)
-          .setDatanodeDetails(context.getParent().getDatanodeDetails())
-          .setContext(context)
-          .build();
-    case SHUTDOWN:
-      break;
-    default:
-      throw new IllegalArgumentException("Illegal Argument.");
-    }
-    return null;
-  }
-
-  /**
-   * Computes the next state the container state machine must move to by 
looking
-   * at all the state of endpoints.
-   * <p>
-   * if any endpoint state has moved to Shutdown, either we have an
-   * unrecoverable error or we have been told to shutdown. Either case the
-   * datanode state machine should move to Shutdown state, otherwise we
-   * remain in the Running state.
-   *
-   * @return next container state.
-   */
-  private DatanodeStateMachine.DatanodeStates
-      computeNextContainerState(
-      List<Future<EndpointStateMachine.EndPointStates>> results) {
-    for (Future<EndpointStateMachine.EndPointStates> state : results) {
-      try {
-        if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) {
-          // if any endpoint tells us to shutdown we move to shutdown state.
-          return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Error in executing end point task.", e);
-      }
-    }
-    return DatanodeStateMachine.DatanodeStates.RUNNING;
-  }
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param duration - Time
-   * @param timeUnit - Unit of duration.
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates
-      await(long duration, TimeUnit timeUnit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    int count = connectionManager.getValues().size();
-    int returned = 0;
-    long timeLeft = timeUnit.toMillis(duration);
-    long startTime = Time.monotonicNow();
-    List<Future<EndpointStateMachine.EndPointStates>> results = new
-        LinkedList<>();
-
-    while (returned < count && timeLeft > 0) {
-      Future<EndpointStateMachine.EndPointStates> result =
-          ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
-      if (result != null) {
-        results.add(result);
-        returned++;
-      }
-      timeLeft = timeLeft - (Time.monotonicNow() - startTime);
-    }
-    return computeNextContainerState(results);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
deleted file mode 100644
index 6b8d16c..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-/**
- This package contians files that guide the state transitions from
- Init->Running->Shutdown for the datanode.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
deleted file mode 100644
index 4fd72ec..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.GeneratedMessage;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineAction;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.ozone.container.common.helpers
-    .DeletedContainerBlocksSummary;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine.EndPointStates;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.ZonedDateTime;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_CONTAINER_ACTION_MAX_LIMIT;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_PIPELINE_ACTION_MAX_LIMIT;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT;
-
-/**
- * Heartbeat class for SCMs.
- */
-public class HeartbeatEndpointTask
-    implements Callable<EndpointStateMachine.EndPointStates> {
-  static final Logger LOG =
-      LoggerFactory.getLogger(HeartbeatEndpointTask.class);
-  private final EndpointStateMachine rpcEndpoint;
-  private final Configuration conf;
-  private DatanodeDetailsProto datanodeDetailsProto;
-  private StateContext context;
-  private int maxContainerActionsPerHB;
-  private int maxPipelineActionsPerHB;
-
-  /**
-   * Constructs a SCM heart beat.
-   *
-   * @param conf Config.
-   */
-  public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
-      Configuration conf, StateContext context) {
-    this.rpcEndpoint = rpcEndpoint;
-    this.conf = conf;
-    this.context = context;
-    this.maxContainerActionsPerHB = 
conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
-        HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
-    this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT,
-        HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT);
-  }
-
-  /**
-   * Get the container Node ID proto.
-   *
-   * @return ContainerNodeIDProto
-   */
-  public DatanodeDetailsProto getDatanodeDetailsProto() {
-    return datanodeDetailsProto;
-  }
-
-  /**
-   * Set container node ID proto.
-   *
-   * @param datanodeDetailsProto - the node id.
-   */
-  public void setDatanodeDetailsProto(DatanodeDetailsProto
-      datanodeDetailsProto) {
-    this.datanodeDetailsProto = datanodeDetailsProto;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public EndpointStateMachine.EndPointStates call() throws Exception {
-    rpcEndpoint.lock();
-    SCMHeartbeatRequestProto.Builder requestBuilder = null;
-    try {
-      Preconditions.checkState(this.datanodeDetailsProto != null);
-
-      requestBuilder = SCMHeartbeatRequestProto.newBuilder()
-          .setDatanodeDetails(datanodeDetailsProto);
-      addReports(requestBuilder);
-      addContainerActions(requestBuilder);
-      addPipelineActions(requestBuilder);
-      SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
-          .sendHeartbeat(requestBuilder.build());
-      processResponse(reponse, datanodeDetailsProto);
-      rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
-      rpcEndpoint.zeroMissedCount();
-    } catch (IOException ex) {
-      // put back the reports which failed to be sent
-      putBackReports(requestBuilder);
-      rpcEndpoint.logIfNeeded(ex);
-    } finally {
-      rpcEndpoint.unlock();
-    }
-    return rpcEndpoint.getState();
-  }
-
-  // TODO: Make it generic.
-  private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) 
{
-    List<GeneratedMessage> reports = new LinkedList<>();
-    if (requestBuilder.hasContainerReport()) {
-      reports.add(requestBuilder.getContainerReport());
-    }
-    if (requestBuilder.hasNodeReport()) {
-      reports.add(requestBuilder.getNodeReport());
-    }
-    if (requestBuilder.getCommandStatusReportsCount() != 0) {
-      for (GeneratedMessage msg : requestBuilder
-          .getCommandStatusReportsList()) {
-        reports.add(msg);
-      }
-    }
-    context.putBackReports(reports);
-  }
-
-  /**
-   * Adds all the available reports to heartbeat.
-   *
-   * @param requestBuilder builder to which the report has to be added.
-   */
-  private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
-    for (GeneratedMessage report : context.getAllAvailableReports()) {
-      String reportName = report.getDescriptorForType().getFullName();
-      for (Descriptors.FieldDescriptor descriptor :
-          SCMHeartbeatRequestProto.getDescriptor().getFields()) {
-        String heartbeatFieldName = descriptor.getMessageType().getFullName();
-        if (heartbeatFieldName.equals(reportName)) {
-          if (descriptor.isRepeated()) {
-            requestBuilder.addRepeatedField(descriptor, report);
-          } else {
-            requestBuilder.setField(descriptor, report);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Adds all the pending ContainerActions to the heartbeat.
-   *
-   * @param requestBuilder builder to which the report has to be added.
-   */
-  private void addContainerActions(
-      SCMHeartbeatRequestProto.Builder requestBuilder) {
-    List<ContainerAction> actions = context.getPendingContainerAction(
-        maxContainerActionsPerHB);
-    if (!actions.isEmpty()) {
-      ContainerActionsProto cap = ContainerActionsProto.newBuilder()
-          .addAllContainerActions(actions)
-          .build();
-      requestBuilder.setContainerActions(cap);
-    }
-  }
-
-  /**
-   * Adds all the pending PipelineActions to the heartbeat.
-   *
-   * @param requestBuilder builder to which the report has to be added.
-   */
-  private void addPipelineActions(
-      SCMHeartbeatRequestProto.Builder requestBuilder) {
-    List<PipelineAction> actions = context.getPendingPipelineAction(
-        maxPipelineActionsPerHB);
-    if (!actions.isEmpty()) {
-      PipelineActionsProto pap = PipelineActionsProto.newBuilder()
-          .addAllPipelineActions(actions)
-          .build();
-      requestBuilder.setPipelineActions(pap);
-    }
-  }
-
-  /**
-   * Returns a builder class for HeartbeatEndpointTask task.
-   * @return   Builder.
-   */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Add this command to command processing Queue.
-   *
-   * @param response - SCMHeartbeat response.
-   */
-  private void processResponse(SCMHeartbeatResponseProto response,
-      final DatanodeDetailsProto datanodeDetails) {
-    Preconditions.checkState(response.getDatanodeUUID()
-            .equalsIgnoreCase(datanodeDetails.getUuid()),
-        "Unexpected datanode ID in the response.");
-    // Verify the response is indeed for this datanode.
-    for (SCMCommandProto commandResponseProto : response
-        .getCommandsList()) {
-      switch (commandResponseProto.getCommandType()) {
-      case reregisterCommand:
-        if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Received SCM notification to register."
-                + " Interrupt HEARTBEAT and transit to REGISTER state.");
-          }
-          rpcEndpoint.setState(EndPointStates.REGISTER);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Illegal state {} found, expecting {}.",
-                rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
-          }
-        }
-        break;
-      case deleteBlocksCommand:
-        DeleteBlocksCommand db = DeleteBlocksCommand
-            .getFromProtobuf(
-                commandResponseProto.getDeleteBlocksCommandProto());
-        if (!db.blocksTobeDeleted().isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(DeletedContainerBlocksSummary
-                .getFrom(db.blocksTobeDeleted())
-                .toString());
-          }
-          this.context.addCommand(db);
-        }
-        break;
-      case closeContainerCommand:
-        CloseContainerCommand closeContainer =
-            CloseContainerCommand.getFromProtobuf(
-                commandResponseProto.getCloseContainerCommandProto());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Received SCM container close request for container {}",
-              closeContainer.getContainerID());
-        }
-        this.context.addCommand(closeContainer);
-        break;
-      case replicateContainerCommand:
-        ReplicateContainerCommand replicateContainerCommand =
-            ReplicateContainerCommand.getFromProtobuf(
-                commandResponseProto.getReplicateContainerCommandProto());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Received SCM container replicate request for container 
{}",
-              replicateContainerCommand.getContainerID());
-        }
-        this.context.addCommand(replicateContainerCommand);
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown response : "
-            + commandResponseProto.getCommandType().name());
-      }
-    }
-  }
-
-  /**
-   * Builder class for HeartbeatEndpointTask.
-   */
-  public static class Builder {
-    private EndpointStateMachine endPointStateMachine;
-    private Configuration conf;
-    private DatanodeDetails datanodeDetails;
-    private StateContext context;
-
-    /**
-     * Constructs the builder class.
-     */
-    public Builder() {
-    }
-
-    /**
-     * Sets the endpoint state machine.
-     *
-     * @param rpcEndPoint - Endpoint state machine.
-     * @return Builder
-     */
-    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
-      this.endPointStateMachine = rpcEndPoint;
-      return this;
-    }
-
-    /**
-     * Sets the Config.
-     *
-     * @param config - config
-     * @return Builder
-     */
-    public Builder setConfig(Configuration config) {
-      this.conf = config;
-      return this;
-    }
-
-    /**
-     * Sets the NodeID.
-     *
-     * @param dnDetails - NodeID proto
-     * @return Builder
-     */
-    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
-      this.datanodeDetails = dnDetails;
-      return this;
-    }
-
-    /**
-     * Sets the context.
-     * @param stateContext - State context.
-     * @return this.
-     */
-    public Builder setContext(StateContext stateContext) {
-      this.context = stateContext;
-      return this;
-    }
-
-    public HeartbeatEndpointTask build() {
-      if (endPointStateMachine == null) {
-        LOG.error("No endpoint specified.");
-        throw new IllegalArgumentException("A valid endpoint state machine is" 
+
-            " needed to construct HeartbeatEndpointTask task");
-      }
-
-      if (conf == null) {
-        LOG.error("No config specified.");
-        throw new IllegalArgumentException("A valid configration is needed to" 
+
-            " construct HeartbeatEndpointTask task");
-      }
-
-      if (datanodeDetails == null) {
-        LOG.error("No datanode specified.");
-        throw new IllegalArgumentException("A vaild Node ID is needed to " +
-            "construct HeartbeatEndpointTask task");
-      }
-
-      HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
-          .endPointStateMachine, this.conf, this.context);
-      task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
-      return task;
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to