This is an automated email from the ASF dual-hosted git repository.

licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit e1fc4a462aca8cd19aa1d846844d3b2ac0071465
Author: Nandakumar <[email protected]>
AuthorDate: Thu May 28 14:38:58 2020 +0530

    HDDS-3192. Handle AllocateContainer operation for HA. (#975)
---
 .../hdds/scm/container/ContainerManagerImpl.java   |  19 +-
 .../hdds/scm/container/ContainerManagerV2.java     |   5 +-
 .../scm/container/ContainerStateManagerImpl.java   | 140 ++++++-------
 .../scm/container/ContainerStateManagerV2.java     |  76 ++++++-
 .../scm/container/states/ContainerStateMap.java    |   1 +
 .../hadoop/hdds/scm/ha/SCMHAInvocationHandler.java |   6 +-
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |  40 +---
 .../{SCMHAManager.java => SCMHAManagerImpl.java}   |  26 +--
 .../hadoop/hdds/scm/ha/SCMRatisResponse.java       |   8 +
 .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java  |  85 +-------
 ...SCMRatisServer.java => SCMRatisServerImpl.java} |  16 +-
 .../scm/container/TestContainerManagerImpl.java    |  91 ++++++++
 .../hadoop/hdds/scm/ha/MockSCMHAManager.java       | 148 +++++++++++++
 .../hdds/scm/pipeline/MockPipelineManager.java     | 228 +++++++++++++++++++++
 14 files changed, 656 insertions(+), 233 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 0404530..36b9a30 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -47,6 +47,10 @@ import org.slf4j.LoggerFactory;
  */
 public class ContainerManagerImpl implements ContainerManagerV2 {
 
+  /*
+   * TODO: Introduce container level locks.
+   */
+
   /**
    *
    */
@@ -72,17 +76,18 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
    *
    */
   public ContainerManagerImpl(
-      // Introduce builder for this class?
-      final Configuration conf, final PipelineManager pipelineManager,
-      final SCMHAManager scmhaManager,
+      final Configuration conf,
+      final SCMHAManager scmHaManager,
+      final PipelineManager pipelineManager,
       final Table<ContainerID, ContainerInfo> containerStore)
       throws IOException {
+    // Introduce builder for this class?
     this.lock = new ReentrantReadWriteLock();
     this.pipelineManager = pipelineManager;
-    this.containerStateManager =  ContainerStateManagerImpl.newBuilder()
+    this.containerStateManager = ContainerStateManagerImpl.newBuilder()
         .setConfiguration(conf)
         .setPipelineManager(pipelineManager)
-        .setRatisServer(scmhaManager.getRatisServer())
+        .setRatisServer(scmHaManager.getRatisServer())
         .setContainerStore(containerStore)
         .build();
   }
@@ -275,8 +280,8 @@ public class ContainerManagerImpl implements 
ContainerManagerV2 {
   }
 
   @Override
-  public void close() throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented!");
+  public void close() throws Exception {
+    containerStateManager.close();
   }
 
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
index 37c7b70..863ca4d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
@@ -16,7 +16,6 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -36,8 +35,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
  * mapping. This is used by SCM when allocating new locations and when
  * looking up a key.
  */
-public interface ContainerManagerV2 extends Closeable {
-
+public interface ContainerManagerV2 extends AutoCloseable {
+  // TODO: Rename this to ContainerManager
 
   /**
    * Returns all the container Ids managed by ContainerManager.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 16fe340..4f4456a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
-
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Map;
@@ -27,128 +26,92 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
-import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
-import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
-import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.states.ContainerState;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.common.statemachine.StateMachine;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * TODO: Add javadoc.
+ * Default implementation of ContainerStateManager. This implementation
+ * holds the Container States in-memory which is backed by a persistent store.
+ * The persistent store is always kept in sync with the in-memory state 
changes.
  */
 public final class ContainerStateManagerImpl
     implements ContainerStateManagerV2 {
 
-  /* **********************************************************************
-   * Container Life Cycle                                                 *
-   *                                                                      *
-   * Event and State Transition Mapping:                                  *
-   *                                                                      *
-   * State: OPEN         ----------------> CLOSING                        *
-   * Event:                    FINALIZE                                   *
-   *                                                                      *
-   * State: CLOSING      ----------------> QUASI_CLOSED                   *
-   * Event:                  QUASI_CLOSE                                  *
-   *                                                                      *
-   * State: CLOSING      ----------------> CLOSED                         *
-   * Event:                     CLOSE                                     *
-   *                                                                      *
-   * State: QUASI_CLOSED ----------------> CLOSED                         *
-   * Event:                  FORCE_CLOSE                                  *
-   *                                                                      *
-   * State: CLOSED       ----------------> DELETING                       *
-   * Event:                    DELETE                                     *
-   *                                                                      *
-   * State: DELETING     ----------------> DELETED                        *
-   * Event:                    CLEANUP                                    *
-   *                                                                      *
-   *                                                                      *
-   * Container State Flow:                                                *
-   *                                                                      *
-   * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]        *
-   *          (FINALIZE)      |      (QUASI_CLOSE)        |               *
-   *                          |                           |               *
-   *                          |                           |               *
-   *                  (CLOSE) |             (FORCE_CLOSE) |               *
-   *                          |                           |               *
-   *                          |                           |               *
-   *                          +--------->[CLOSED]<--------+               *
-   *                                        |                             *
-   *                                (DELETE)|                             *
-   *                                        |                             *
-   *                                        |                             *
-   *                                   [DELETING]                         *
-   *                                        |                             *
-   *                              (CLEANUP) |                             *
-   *                                        |                             *
-   *                                        V                             *
-   *                                    [DELETED]                         *
-   *                                                                      *
-   ************************************************************************/
-
   /**
-   *
+   * Logger instance of ContainerStateManagerImpl.
    */
   private static final Logger LOG = LoggerFactory.getLogger(
       ContainerStateManagerImpl.class);
 
   /**
-   *
+   * Configured container size.
    */
   private final long containerSize;
 
   /**
-   *
+   * The container ID sequence which is used to create new container.
+   * This will be removed once we have a Distributed Sequence ID Generator.
    */
+  @Deprecated
   private final AtomicLong nextContainerID;
 
   /**
-   *
+   * In-memory representation of Container States.
    */
   private final ContainerStateMap containers;
 
   /**
-   *
+   * Persistent store for Container States.
    */
-  private final PipelineManager pipelineManager;
+  private Table<ContainerID, ContainerInfo> containerStore;
 
   /**
-   *
+   * PipelineManager instance.
    */
-  private Table<ContainerID, ContainerInfo> containerStore;
+  private final PipelineManager pipelineManager;
 
   /**
-   *
+   * Container lifecycle state machine.
    */
   private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
 
   /**
-   *
+   * We use the containers in round-robin fashion for operations like block
+   * allocation. This map is used for remembering the last used container.
    */
   private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
 
   /**
+   * constructs ContainerStateManagerImpl instance and loads the containers
+   * form the persistent storage.
    *
+   * @param conf the Configuration
+   * @param pipelineManager the {@link PipelineManager} instance
+   * @param containerStore the persistent storage
+   * @throws IOException in case of error while loading the containers
    */
   private ContainerStateManagerImpl(final Configuration conf,
       final PipelineManager pipelineManager,
@@ -158,7 +121,7 @@ public final class ContainerStateManagerImpl
     this.containerStore = containerStore;
     this.stateMachine = newStateMachine();
     this.containerSize = getConfiguredContainerSize(conf);
-    this.nextContainerID = new AtomicLong();
+    this.nextContainerID = new AtomicLong(1L);
     this.containers = new ContainerStateMap();
     this.lastUsedMap = new ConcurrentHashMap<>();
 
@@ -166,7 +129,9 @@ public final class ContainerStateManagerImpl
   }
 
   /**
+   * Creates and initializes a new Container Lifecycle StateMachine.
    *
+   * @return the Container Lifecycle StateMachine
    */
   private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() {
 
@@ -208,7 +173,9 @@ public final class ContainerStateManagerImpl
   }
 
   /**
+   * Returns the configured container size.
    *
+   * @return the max size of container
    */
   private long getConfiguredContainerSize(final Configuration conf) {
     return (long) conf.getStorageSize(
@@ -218,7 +185,9 @@ public final class ContainerStateManagerImpl
   }
 
   /**
+   * Loads the containers from container store into memory.
    *
+   * @throws IOException in case of error while loading the containers
    */
   private void initialize() throws IOException {
     TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
@@ -282,16 +251,20 @@ public final class ContainerStateManagerImpl
 
     Preconditions.checkNotNull(containerInfo);
     final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo);
-    if (getContainer(container.containerID()) == null) {
-      Preconditions.checkArgument(nextContainerID.get()
-              == container.containerID().getId(),
-          "ContainerID mismatch.");
-
-      pipelineManager.addContainerToPipeline(
-          container.getPipelineID(), container.containerID());
-      containers.addContainer(container);
-      nextContainerID.incrementAndGet();
-    }
+    final ContainerID containerID = container.containerID();
+    final PipelineID pipelineID = container.getPipelineID();
+
+    /*
+     * TODO:
+     *  Check if the container already exist in in ContainerStateManager.
+     *  This optimization can be done after moving ContainerNotFoundException
+     *  from ContainerStateMap to ContainerManagerImpl.
+     */
+
+    containerStore.put(containerID, container);
+    containers.addContainer(container);
+    pipelineManager.addContainerToPipeline(pipelineID, containerID);
+    nextContainerID.incrementAndGet();
   }
 
   void updateContainerState(final ContainerID containerID,
@@ -337,7 +310,9 @@ public final class ContainerStateManagerImpl
     throw new UnsupportedOperationException("Not yet implemented!");
   }
 
-  void close() throws IOException {
+  @Override
+  public void close() throws Exception {
+    containerStore.close();
   }
 
   public static Builder newBuilder() {
@@ -382,7 +357,6 @@ public final class ContainerStateManagerImpl
 
       final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
           conf, pipelineMgr, table);
-      scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm);
 
       final SCMHAInvocationHandler invocationHandler =
           new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
index 9960354..3520b01 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
@@ -17,20 +17,26 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
+import java.io.IOException;
+import java.util.Set;
+
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.metadata.Replicate;
 
-import java.io.IOException;
-import java.util.Set;
-
 /**
+ * A ContainerStateManager is responsible for keeping track of all the
+ * container and its state inside SCM, it also exposes methods to read and
+ * modify the container and its state.
  *
- * TODO: Add proper javadoc.
+ * All the mutation operations are marked with {@link Replicate} annotation so
+ * that when SCM-HA is enabled, the mutations are replicated from leader SCM
+ * to the followers.
  *
- * Implementation of methods marked with {@code @Replicate} annotation should 
be
+ * When a method is marked with {@link Replicate} annotation it should follow
+ * the below rules.
  *
- * 1. Idempotent
+ * 1. The method call should be Idempotent
  * 2. Arguments should be of protobuf objects
  * 3. Return type should be of protobuf object
  * 4. The declaration should throw RaftException
@@ -38,13 +44,65 @@ import java.util.Set;
  */
 public interface ContainerStateManagerV2 {
 
+  //TODO: Rename this to ContainerStateManager
+
+  /* **********************************************************************
+   * Container Life Cycle                                                 *
+   *                                                                      *
+   * Event and State Transition Mapping:                                  *
+   *                                                                      *
+   * State: OPEN         ----------------> CLOSING                        *
+   * Event:                    FINALIZE                                   *
+   *                                                                      *
+   * State: CLOSING      ----------------> QUASI_CLOSED                   *
+   * Event:                  QUASI_CLOSE                                  *
+   *                                                                      *
+   * State: CLOSING      ----------------> CLOSED                         *
+   * Event:                     CLOSE                                     *
+   *                                                                      *
+   * State: QUASI_CLOSED ----------------> CLOSED                         *
+   * Event:                  FORCE_CLOSE                                  *
+   *                                                                      *
+   * State: CLOSED       ----------------> DELETING                       *
+   * Event:                    DELETE                                     *
+   *                                                                      *
+   * State: DELETING     ----------------> DELETED                        *
+   * Event:                    CLEANUP                                    *
+   *                                                                      *
+   *                                                                      *
+   * Container State Flow:                                                *
+   *                                                                      *
+   * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]        *
+   *          (FINALIZE)      |      (QUASI_CLOSE)        |               *
+   *                          |                           |               *
+   *                          |                           |               *
+   *                  (CLOSE) |             (FORCE_CLOSE) |               *
+   *                          |                           |               *
+   *                          |                           |               *
+   *                          +--------->[CLOSED]<--------+               *
+   *                                        |                             *
+   *                                (DELETE)|                             *
+   *                                        |                             *
+   *                                        |                             *
+   *                                   [DELETING]                         *
+   *                                        |                             *
+   *                              (CLEANUP) |                             *
+   *                                        |                             *
+   *                                        V                             *
+   *                                    [DELETED]                         *
+   *                                                                      *
+   ************************************************************************/
+
   /**
-   *
+   * Returns a new container ID which can be used for allocating a new
+   * container.
    */
   ContainerID getNextContainerID();
 
   /**
+   * Returns the ID of all the managed containers.
    *
+   * @return Set of {@link ContainerID}
    */
   Set<ContainerID> getContainerIDs();
 
@@ -72,4 +130,8 @@ public interface ContainerStateManagerV2 {
   void addContainer(ContainerInfoProto containerInfo)
       throws IOException;
 
+  /**
+   *
+   */
+  void close() throws Exception;
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 8cef966..d71049b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -535,6 +535,7 @@ public class ContainerStateMap {
     }
   }
 
+  // TODO: Move container not found exception to upper layer.
   private void checkIfContainerExist(ContainerID containerID)
       throws ContainerNotFoundException {
     if (!containerMap.containsKey(containerID)) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
index c78c616..cbe2ce3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
@@ -50,6 +50,7 @@ public class SCMHAInvocationHandler implements 
InvocationHandler {
     this.requestType = requestType;
     this.localHandler = localHandler;
     this.ratisHandler = ratisHandler;
+    ratisHandler.registerStateMachineHandler(requestType, localHandler);
   }
 
   @Override
@@ -71,8 +72,9 @@ public class SCMHAInvocationHandler implements 
InvocationHandler {
    */
   private Object invokeLocal(Method method, Object[] args)
       throws InvocationTargetException, IllegalAccessException {
-    LOG.trace("Invoking method {} on target {}", method, localHandler);
-    return method.invoke(method, args);
+    LOG.trace("Invoking method {} on target {} with arguments {}",
+        method, localHandler, args);
+    return method.invoke(localHandler, args);
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index b38fc43..eb6c800 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -17,60 +17,30 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-
 import java.io.IOException;
 
 /**
  * SCMHAManager provides HA service for SCM.
- *
- * It uses Apache Ratis for HA implementation. We will have a 2N+1
- * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
- * nodes.
- *
- * TODO
- *
  */
-public class SCMHAManager {
-
-  private static boolean isLeader = true;
-
-  private final SCMRatisServer ratisServer;
-
-  /**
-   * Creates SCMHAManager instance.
-   */
-  public SCMHAManager(final ConfigurationSource conf) throws IOException {
-    this.ratisServer = new SCMRatisServer(
-        conf.getObject(SCMHAConfiguration.class), conf);
-  }
+public interface SCMHAManager {
 
   /**
    * Starts HA service.
    */
-  public void start() throws IOException {
-    ratisServer.start();
-  }
+  void start() throws IOException;
 
   /**
    * Returns true if the current SCM is the leader.
    */
-  public static boolean isLeader() {
-    return isLeader;
-  }
+  boolean isLeader();
 
   /**
    * Returns RatisServer instance associated with the SCM instance.
    */
-  public SCMRatisServer getRatisServer() {
-    return ratisServer;
-  }
+  SCMRatisServer getRatisServer();
 
   /**
    * Stops the HA service.
    */
-  public void shutdown() throws IOException {
-    ratisServer.stop();
-  }
-
+  void shutdown() throws IOException;
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
similarity index 74%
copy from 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
copy to 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index b38fc43..89ac714 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -22,53 +22,55 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import java.io.IOException;
 
 /**
- * SCMHAManager provides HA service for SCM.
- *
- * It uses Apache Ratis for HA implementation. We will have a 2N+1
+ * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
  * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
  * nodes.
  *
  * TODO
  *
  */
-public class SCMHAManager {
+public class SCMHAManagerImpl implements SCMHAManager {
 
   private static boolean isLeader = true;
 
-  private final SCMRatisServer ratisServer;
+  private final SCMRatisServerImpl ratisServer;
 
   /**
    * Creates SCMHAManager instance.
    */
-  public SCMHAManager(final ConfigurationSource conf) throws IOException {
-    this.ratisServer = new SCMRatisServer(
+  public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException {
+    this.ratisServer = new SCMRatisServerImpl(
         conf.getObject(SCMHAConfiguration.class), conf);
   }
 
   /**
-   * Starts HA service.
+   * {@inheritDoc}
    */
+  @Override
   public void start() throws IOException {
     ratisServer.start();
   }
 
   /**
-   * Returns true if the current SCM is the leader.
+   * {@inheritDoc}
    */
-  public static boolean isLeader() {
+  @Override
+  public boolean isLeader() {
     return isLeader;
   }
 
   /**
-   * Returns RatisServer instance associated with the SCM instance.
+   * {@inheritDoc}
    */
+  @Override
   public SCMRatisServer getRatisServer() {
     return ratisServer;
   }
 
   /**
-   * Stops the HA service.
+   * {@inheritDoc}
    */
+  @Override
   public void shutdown() throws IOException {
     ratisServer.stop();
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
index c4bedcc..21ca4be 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
@@ -67,6 +67,10 @@ public final class SCMRatisResponse {
   public static Message encode(final Object result)
       throws InvalidProtocolBufferException {
 
+    if (result == null) {
+      return Message.EMPTY;
+    }
+
     final ByteString value;
     if (result instanceof GeneratedMessage) {
       value = ((GeneratedMessage) result).toByteString();
@@ -98,6 +102,10 @@ public final class SCMRatisResponse {
 
   private static Object deserializeResult(byte[] response)
       throws InvalidProtocolBufferException {
+    if (response.length == 0) {
+      return null;
+    }
+
     final SCMRatisResponseProto responseProto =
         SCMRatisResponseProto.parseFrom(response);
     try {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index 209535d..4ddbc7b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -17,93 +17,22 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServer;
 
 /**
  * TODO.
  */
-public class SCMRatisServer {
-
-  private final InetSocketAddress address;
-  private final RaftServer server;
-  private final RaftGroupId raftGroupId;
-  private final RaftGroup raftGroup;
-  private final RaftPeerId raftPeerId;
-  private final SCMStateMachine scmStateMachine;
-  private final ClientId clientId = ClientId.randomId();
-  private final AtomicLong callId = new AtomicLong();
+public interface SCMRatisServer {
 
+  void start() throws IOException;
 
-  // TODO: Refactor and remove ConfigurationSource and use only
-  //  SCMHAConfiguration.
-  SCMRatisServer(final SCMHAConfiguration haConf,
-                 final ConfigurationSource conf)
-      throws IOException {
-    final String scmServiceId = "SCM-HA-Service";
-    final String scmNodeId = "localhost";
-    this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
-    this.address = haConf.getRatisBindAddress();
-    final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address);
-    final List<RaftPeer> raftPeers = new ArrayList<>();
-    raftPeers.add(localRaftPeer);
-    final RaftProperties serverProperties = RatisUtil
-        .newRaftProperties(haConf, conf);
-    this.raftGroupId = RaftGroupId.valueOf(
-        UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8)));
-    this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
-    this.scmStateMachine = new SCMStateMachine();
-    this.server = RaftServer.newBuilder()
-        .setServerId(raftPeerId)
-        .setGroup(raftGroup)
-        .setProperties(serverProperties)
-        .setStateMachine(scmStateMachine)
-        .build();
-  }
-
-  void start() throws IOException {
-    server.start();
-  }
-
-  public void registerStateMachineHandler(final RequestType handlerType,
-                                          final Object handler) {
-    scmStateMachine.registerHandler(handlerType, handler);
-  }
+  void registerStateMachineHandler(RequestType handlerType, Object handler);
 
   SCMRatisResponse submitRequest(SCMRatisRequest request)
-      throws IOException, ExecutionException, InterruptedException {
-    final RaftClientRequest raftClientRequest = new RaftClientRequest(
-        clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
-        RaftClientRequest.writeRequestType(), null);
-    final RaftClientReply raftClientReply =
-        server.submitClientRequestAsync(raftClientRequest).get();
-    return SCMRatisResponse.decode(raftClientReply);
-  }
-
-  private long nextCallId() {
-    return callId.getAndIncrement() & Long.MAX_VALUE;
-  }
-
-  void stop() throws IOException {
-    server.close();
-  }
+      throws IOException, ExecutionException, InterruptedException;
 
+  void stop() throws IOException;
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
similarity index 91%
copy from 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
copy to 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 209535d..45ae212 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -41,7 +41,7 @@ import org.apache.ratis.server.RaftServer;
 /**
  * TODO.
  */
-public class SCMRatisServer {
+public class SCMRatisServerImpl implements SCMRatisServer {
 
   private final InetSocketAddress address;
   private final RaftServer server;
@@ -55,8 +55,8 @@ public class SCMRatisServer {
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
-  SCMRatisServer(final SCMHAConfiguration haConf,
-                 final ConfigurationSource conf)
+  SCMRatisServerImpl(final SCMHAConfiguration haConf,
+                     final ConfigurationSource conf)
       throws IOException {
     final String scmServiceId = "SCM-HA-Service";
     final String scmNodeId = "localhost";
@@ -79,16 +79,19 @@ public class SCMRatisServer {
         .build();
   }
 
-  void start() throws IOException {
+  @Override
+  public void start() throws IOException {
     server.start();
   }
 
+  @Override
   public void registerStateMachineHandler(final RequestType handlerType,
                                           final Object handler) {
     scmStateMachine.registerHandler(handlerType, handler);
   }
 
-  SCMRatisResponse submitRequest(SCMRatisRequest request)
+  @Override
+  public SCMRatisResponse submitRequest(SCMRatisRequest request)
       throws IOException, ExecutionException, InterruptedException {
     final RaftClientRequest raftClientRequest = new RaftClientRequest(
         clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
@@ -102,7 +105,8 @@ public class SCMRatisServer {
     return callId.getAndIncrement() & Long.MAX_VALUE;
   }
 
-  void stop() throws IOException {
+  @Override
+  public void stop() throws IOException {
     server.close();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
new file mode 100644
index 0000000..022d392
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+
+/**
+ * Tests to verify the functionality of ContainerManager.
+ */
+public class TestContainerManagerImpl {
+
+  private File testDir;
+  private DBStore dbStore;
+  private ContainerManagerV2 containerManager;
+
+  @Before
+  public void setUp() throws Exception {
+    final OzoneConfiguration conf = SCMTestUtils.getConf();
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    final PipelineManager pipelineManager = MockPipelineManager.getInstance();
+    pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE);
+    containerManager = new ContainerManagerImpl(conf,
+        MockSCMHAManager.getInstance(), pipelineManager,
+        SCMDBDefinition.CONTAINERS.getTable(dbStore));
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if(containerManager != null) {
+      containerManager.close();
+    }
+
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @Test
+  public void testAllocateContainer() throws Exception {
+    Assert.assertTrue(containerManager.getContainerIDs().isEmpty());
+    final ContainerInfo container = containerManager.allocateContainer(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE, "admin");
+    Assert.assertEquals(1, containerManager.getContainerIDs().size());
+    Assert.assertNotNull(containerManager.getContainer(
+        container.containerID()));
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
new file mode 100644
index 0000000..c3b14fb
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.StateMachineException;
+
+/**
+ * Mock SCMHAManager implementation for testing.
+ */
+public final class MockSCMHAManager implements SCMHAManager {
+
+  private final SCMRatisServer ratisServer;
+
+  public static SCMHAManager getInstance() {
+    return new MockSCMHAManager();
+  }
+
+  /**
+   * Creates MockSCMHAManager instance.
+   */
+  private MockSCMHAManager() {
+    this.ratisServer = new MockRatisServer();
+  }
+
+  @Override
+  public void start() throws IOException {
+    ratisServer.start();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isLeader() {
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public SCMRatisServer getRatisServer() {
+    return ratisServer;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void shutdown() throws IOException {
+    ratisServer.stop();
+  }
+
+  private static class MockRatisServer implements SCMRatisServer {
+
+    private Map<RequestType, Object> handlers =
+        new EnumMap<>(RequestType.class);
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void registerStateMachineHandler(final RequestType handlerType,
+                                            final Object handler) {
+      handlers.put(handlerType, handler);
+    }
+
+    @Override
+    public SCMRatisResponse submitRequest(final SCMRatisRequest request)
+        throws IOException {
+      final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf(
+          RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
+      RaftClientReply reply;
+      try {
+        final Message result = process(request);
+        return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
+            raftId, 1L, true, result, null, 1L, null));
+      } catch (Exception ex) {
+        return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
+            raftId, 1L, false, null,
+            new StateMachineException(raftId, ex), 1L, null));
+      }
+    }
+
+    private Message process(final SCMRatisRequest request)
+        throws Exception {
+      try {
+        final Object handler = handlers.get(request.getType());
+
+        if (handler == null) {
+          throw new IOException("No handler found for request type " +
+              request.getType());
+        }
+
+        final List<Class<?>> argumentTypes = new ArrayList<>();
+        for(Object args : request.getArguments()) {
+          argumentTypes.add(args.getClass());
+        }
+        final Object result = handler.getClass().getMethod(
+            request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
+            .invoke(handler, request.getArguments());
+
+        return SCMRatisResponse.encode(result);
+      } catch (NoSuchMethodException | SecurityException ex) {
+        throw new InvalidProtocolBufferException(ex.getMessage());
+      } catch (InvocationTargetException e) {
+        final Exception targetEx = (Exception) e.getTargetException();
+        throw targetEx != null ? targetEx : e;
+      }
+    }
+
+    @Override
+    public void stop() {
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
new file mode 100644
index 0000000..5dd6082
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Mock PipelineManager implementation for testing.
+ */
+public final class MockPipelineManager implements PipelineManager {
+
+  private PipelineStateManager stateManager;
+
+  public static PipelineManager getInstance() {
+    return new MockPipelineManager();
+  }
+
+  private MockPipelineManager() {
+    this.stateManager = new PipelineStateManager();
+  }
+
+  @Override
+  public Pipeline createPipeline(final ReplicationType type,
+                                 final ReplicationFactor factor)
+      throws IOException {
+    final List<DatanodeDetails> nodes = Stream.generate(
+        MockDatanodeDetails::randomDatanodeDetails)
+        .limit(factor.getNumber()).collect(Collectors.toList());
+    final Pipeline pipeline = Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setType(type)
+        .setFactor(factor)
+        .setNodes(nodes)
+        .setState(Pipeline.PipelineState.OPEN)
+        .build();
+    stateManager.addPipeline(pipeline);
+    return pipeline;
+  }
+
+  @Override
+  public Pipeline createPipeline(final ReplicationType type,
+                                 final ReplicationFactor factor,
+                                 final List<DatanodeDetails> nodes) {
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setType(type)
+        .setFactor(factor)
+        .setNodes(nodes)
+        .setState(Pipeline.PipelineState.OPEN)
+        .build();
+  }
+
+  @Override
+  public Pipeline getPipeline(final PipelineID pipelineID)
+      throws PipelineNotFoundException {
+    return stateManager.getPipeline(pipelineID);
+  }
+
+  @Override
+  public boolean containsPipeline(final PipelineID pipelineID) {
+    try {
+      return stateManager.getPipeline(pipelineID) != null;
+    } catch (PipelineNotFoundException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelines() {
+    return stateManager.getPipelines();
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(final ReplicationType type) {
+    return stateManager.getPipelines(type);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(final ReplicationType type,
+                                     final ReplicationFactor factor) {
+    return stateManager.getPipelines(type, factor);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(final ReplicationType type,
+                                     final Pipeline.PipelineState state) {
+    return stateManager.getPipelines(type, state);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(final ReplicationType type,
+                                     final ReplicationFactor factor,
+                                     final Pipeline.PipelineState state) {
+    return stateManager.getPipelines(type, factor, state);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(final ReplicationType type,
+      final ReplicationFactor factor, final Pipeline.PipelineState state,
+      final Collection<DatanodeDetails> excludeDns,
+      final Collection<PipelineID> excludePipelines) {
+    return stateManager.getPipelines(type, factor, state,
+        excludeDns, excludePipelines);
+  }
+
+  @Override
+  public void addContainerToPipeline(final PipelineID pipelineID,
+                                     final ContainerID containerID)
+      throws IOException {
+    stateManager.addContainerToPipeline(pipelineID, containerID);
+  }
+
+  @Override
+  public void removeContainerFromPipeline(final PipelineID pipelineID,
+                                          final ContainerID containerID)
+      throws IOException {
+    stateManager.removeContainerFromPipeline(pipelineID, containerID);
+  }
+
+  @Override
+  public NavigableSet<ContainerID> getContainersInPipeline(
+      final PipelineID pipelineID) throws IOException {
+    return getContainersInPipeline(pipelineID);
+  }
+
+  @Override
+  public int getNumberOfContainers(final PipelineID pipelineID)
+      throws IOException {
+    return getContainersInPipeline(pipelineID).size();
+  }
+
+  @Override
+  public void openPipeline(final PipelineID pipelineId)
+      throws IOException {
+    stateManager.openPipeline(pipelineId);
+  }
+
+  @Override
+  public void finalizeAndDestroyPipeline(final Pipeline pipeline,
+                                         final boolean onTimeout)
+      throws IOException {
+    stateManager.finalizePipeline(pipeline.getId());
+  }
+
+  @Override
+  public void scrubPipeline(final ReplicationType type,
+                            final ReplicationFactor factor)
+      throws IOException {
+
+  }
+
+  @Override
+  public void startPipelineCreator() {
+
+  }
+
+  @Override
+  public void triggerPipelineCreation() {
+
+  }
+
+  @Override
+  public void incNumBlocksAllocatedMetric(final PipelineID id) {
+
+  }
+
+  @Override
+  public void activatePipeline(final PipelineID pipelineID)
+      throws IOException {
+
+  }
+
+  @Override
+  public void deactivatePipeline(final PipelineID pipelineID)
+      throws IOException {
+    stateManager.deactivatePipeline(pipelineID);
+  }
+
+  @Override
+  public boolean getSafeModeStatus() {
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public Map<String, Integer> getPipelineInfo() {
+    return null;
+  }
+
+  @Override
+  public void onMessage(final SCMSafeModeManager.SafeModeStatus safeModeStatus,
+                        final EventPublisher publisher) {
+
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to