Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 c8a8ee500 -> 74484754a


HDFS-13070. Ozone: SCM: Support for container replica reconciliation - 1. 
Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74484754
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74484754
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74484754

Branch: refs/heads/HDFS-7240
Commit: 74484754ac30300fce5b5682ee4ba464dfe3108d
Parents: c8a8ee5
Author: Nanda kumar <na...@apache.org>
Authored: Wed Feb 21 00:40:00 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Feb 21 00:40:00 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   6 +
 .../ozone/scm/StorageContainerManager.java      |   4 +-
 .../ozone/scm/container/ContainerMapping.java   |  27 +-
 .../hadoop/ozone/scm/container/Mapping.java     |  14 +-
 .../ContainerReplicationManager.java            | 308 -----------------
 .../replication/ContainerSupervisor.java        | 333 +++++++++++++++++++
 .../container/replication/InProgressPool.java   |  40 ++-
 .../hadoop/ozone/scm/node/NodeManager.java      |   6 +
 .../hadoop/ozone/scm/node/SCMNodeManager.java   |   5 +
 .../TestUtils/ReplicationNodeManagerMock.java   |  20 +-
 .../TestContainerReplicationManager.java        | 264 ---------------
 .../replication/TestContainerSupervisor.java    | 269 +++++++++++++++
 .../ozone/scm/container/MockNodeManager.java    |   7 +
 .../scm/container/TestContainerMapping.java     |  18 +-
 14 files changed, 703 insertions(+), 618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 6f5c873..fbe9637 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -224,6 +224,12 @@ public final class ScmConfigKeys {
       OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s";
 
   /**
+   * This determines the total number of pools to be processed in parallel.
+   */
+  public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS =
+      "ozone.scm.max.nodepool.processing.threads";
+  public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 
1;
+  /**
    * These 2 settings control the number of threads in executor pool and time
    * outs for thw container reports from all nodes.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index 3276db8..4245fe0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -984,9 +984,7 @@ public class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     updateContainerReportMetrics(reports);
 
     // should we process container reports async?
-    scmContainerManager.processContainerReports(
-        DatanodeID.getFromProtoBuf(reports.getDatanodeID()),
-        reports.getType(), reports.getReportsList());
+    scmContainerManager.processContainerReports(reports);
     return ContainerReportsResponseProto.newBuilder().build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index fe86064..8a82c82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseException;
@@ -29,7 +28,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
@@ -74,6 +75,7 @@ public class ContainerMapping implements Mapping {
   private final PipelineSelector pipelineSelector;
   private final ContainerStateManager containerStateManager;
   private final LeaseManager<ContainerInfo> containerLeaseManager;
+  private final ContainerSupervisor containerSupervisor;
   private final float containerCloseThreshold;
 
   /**
@@ -113,6 +115,9 @@ public class ContainerMapping implements Mapping {
     this.pipelineSelector = new PipelineSelector(nodeManager, conf);
     this.containerStateManager =
         new ContainerStateManager(conf, this);
+    this.containerSupervisor =
+        new ContainerSupervisor(conf, nodeManager,
+            nodeManager.getNodePoolManager());
     this.containerCloseThreshold = conf.getFloat(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -347,16 +352,14 @@ public class ContainerMapping implements Mapping {
   /**
    * Process container report from Datanode.
    *
-   * @param datanodeID Datanode ID
-   * @param reportType Type of report
-   * @param containerInfos container details
+   * @param reports Container report
    */
   @Override
-  public void processContainerReports(
-      DatanodeID datanodeID,
-      ContainerReportsRequestProto.reportType reportType,
-      List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
-          containerInfos) throws IOException {
+  public void processContainerReports(ContainerReportsRequestProto reports)
+      throws IOException {
+    List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
+        containerInfos = reports.getReportsList();
+    containerSupervisor.handleContainerReport(reports);
     for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo :
         containerInfos) {
       byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray();
@@ -395,7 +398,7 @@ public class ContainerMapping implements Mapping {
           // TODO: Handling of containers which are already in close queue.
           if (containerUsedPercentage >= containerCloseThreshold) {
             // TODO: The container has to be moved to close container queue.
-            // For now, we are just updating the container state to CLOSED.
+            // For now, we are just updating the container state to CLOSING.
             // Close container implementation can decide on how to maintain
             // list of containers to be closed, this is the place where we
             // have to add the containers to that list.
@@ -412,7 +415,7 @@ public class ContainerMapping implements Mapping {
           // Container not found in our container db.
           LOG.error("Error while processing container report from datanode :" +
               " {}, for container: {}, reason: container doesn't exist in" +
-              "container database.", datanodeID,
+              "container database.", reports.getDatanodeID(),
               containerInfo.getContainerName());
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
index 577571f..0d442d1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
@@ -17,11 +17,8 @@
 package org.apache.hadoop.ozone.scm.container;
 
 
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
 import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 
@@ -102,14 +99,9 @@ public interface Mapping extends Closeable {
   /**
    * Process container report from Datanode.
    *
-   * @param datanodeID Datanode ID
-   * @param reportType Type of report
-   * @param containerInfos container details
+   * @param reports Container report
    */
-  void processContainerReports(
-      DatanodeID datanodeID,
-      ContainerReportsRequestProto.reportType reportType,
-      List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
-          containerInfos) throws IOException;
+  void processContainerReports(ContainerReportsRequestProto reports)
+      throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
deleted file mode 100644
index f9b86f5..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.scm.container.replication;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.scm.exceptions.SCMException;
-import org.apache.hadoop.ozone.scm.node.CommandQueue;
-import org.apache.hadoop.ozone.scm.node.NodeManager;
-import org.apache.hadoop.ozone.scm.node.NodePoolManager;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static com.google.common.util.concurrent.Uninterruptibles
-    .sleepUninterruptibly;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
-
-/**
- * This class takes a set of container reports that belong to a pool and then
- * computes the replication levels for each container.
- */
-public class ContainerReplicationManager implements Closeable {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(ContainerReplicationManager.class);
-
-  private final NodePoolManager poolManager;
-  private final CommandQueue commandQueue;
-  private final HashSet<String> poolNames;
-  private final PriorityQueue<PeriodicPool> poolQueue;
-  private final NodeManager nodeManager;
-  private final long containerProcessingLag;
-  private final AtomicBoolean runnable;
-  private final ExecutorService executorService;
-  private final long maxPoolWait;
-  private long poolProcessCount;
-  private final List<InProgressPool> inProgressPoolList;
-  private final AtomicInteger threadFaultCount;
-
-  /**
-   * Returns the number of times we have processed pools.
-   * @return long
-   */
-  public long getPoolProcessCount() {
-    return poolProcessCount;
-  }
-
-
-  /**
-   * Constructs a class that computes Replication Levels.
-   *
-   * @param conf - OzoneConfiguration
-   * @param nodeManager - Node Manager
-   * @param poolManager - Pool Manager
-   * @param commandQueue - Datanodes Command Queue.
-   */
-  public ContainerReplicationManager(OzoneConfiguration conf,
-      NodeManager nodeManager, NodePoolManager poolManager,
-      CommandQueue commandQueue) {
-    Preconditions.checkNotNull(poolManager);
-    Preconditions.checkNotNull(commandQueue);
-    Preconditions.checkNotNull(nodeManager);
-    this.containerProcessingLag =
-        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL,
-            OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT,
-            TimeUnit.SECONDS
-        ) * 1000;
-    int maxContainerReportThreads =
-        conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
-            OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
-        );
-    this.maxPoolWait =
-        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT,
-            OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    this.poolManager = poolManager;
-    this.commandQueue = commandQueue;
-    this.nodeManager = nodeManager;
-    this.poolNames = new HashSet<>();
-    this.poolQueue = new PriorityQueue<>();
-    runnable = new AtomicBoolean(true);
-    this.threadFaultCount = new AtomicInteger(0);
-    executorService = HadoopExecutors.newCachedThreadPool(
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Container Reports Processing Thread - %d")
-            .build(), maxContainerReportThreads);
-    inProgressPoolList = new LinkedList<>();
-
-    initPoolProcessThread();
-  }
-
-  /**
-   * Returns the number of pools that are under process right now.
-   * @return  int - Number of pools that are in process.
-   */
-  public int getInProgressPoolCount() {
-    return inProgressPoolList.size();
-  }
-
-  /**
-   * Exits the background thread.
-   */
-  public void setExit() {
-    this.runnable.set(false);
-  }
-
-  /**
-   * Adds or removes pools from names that we need to process.
-   *
-   * There are two different cases that we need to process.
-   * The case where some pools are being added and some times we have to
-   * handle cases where pools are removed.
-   */
-  private void refreshPools() {
-    List<String> pools = this.poolManager.getNodePools();
-    if (pools != null) {
-
-      HashSet<String> removedPools =
-          computePoolDifference(this.poolNames, new HashSet<>(pools));
-
-      HashSet<String> addedPools =
-          computePoolDifference(new HashSet<>(pools), this.poolNames);
-      // TODO: Support remove pool API in pool manager so that this code
-      // path can be tested. This never happens in the current code base.
-      for (String poolName : removedPools) {
-        for (PeriodicPool periodicPool : poolQueue) {
-          if (periodicPool.getPoolName().compareTo(poolName) == 0) {
-            poolQueue.remove(periodicPool);
-          }
-        }
-      }
-      // Remove the pool names that we have in the list.
-      this.poolNames.removeAll(removedPools);
-
-      for (String poolName : addedPools) {
-        poolQueue.add(new PeriodicPool(poolName));
-      }
-
-      // Add to the pool names we are tracking.
-      poolNames.addAll(addedPools);
-    }
-
-  }
-
-  /**
-   * Handle the case where pools are added.
-   *
-   * @param newPools - New Pools list
-   * @param oldPool - oldPool List.
-   */
-  private HashSet<String> computePoolDifference(HashSet<String> newPools,
-      Set<String> oldPool) {
-    Preconditions.checkNotNull(newPools);
-    Preconditions.checkNotNull(oldPool);
-    HashSet<String> newSet = new HashSet<>(newPools);
-    newSet.removeAll(oldPool);
-    return newSet;
-  }
-
-  private void initPoolProcessThread() {
-
-    /*
-     * Task that runs to check if we need to start a pool processing job.
-     * if so we create a pool reconciliation job and find out of all the
-     * expected containers are on the nodes.
-     */
-    Runnable processPools = () -> {
-      while (runnable.get()) {
-        // Make sure that we don't have any new pools.
-        refreshPools();
-        PeriodicPool pool = poolQueue.poll();
-        if (pool != null) {
-          if (pool.getLastProcessedTime() + this.containerProcessingLag <
-              Time.monotonicNow()) {
-            LOG.debug("Adding pool {} to container processing queue", pool
-                .getPoolName());
-            InProgressPool inProgressPool =  new InProgressPool(maxPoolWait,
-                pool, this.nodeManager, this.poolManager, this.commandQueue,
-                this.executorService);
-            inProgressPool.startReconciliation();
-            inProgressPoolList.add(inProgressPool);
-            poolProcessCount++;
-
-          } else {
-
-            LOG.debug("Not within the time window for processing: {}",
-                pool.getPoolName());
-            // Put back this pool since we are not planning to process it.
-            poolQueue.add(pool);
-            // we might over sleep here, not a big deal.
-            sleepUninterruptibly(this.containerProcessingLag,
-                TimeUnit.MILLISECONDS);
-          }
-        }
-        sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
-      }
-    };
-
-    // We will have only one thread for pool processing.
-    Thread poolProcessThread = new Thread(processPools);
-    poolProcessThread.setDaemon(true);
-    poolProcessThread.setName("Pool replica thread");
-    poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
-      // Let us just restart this thread after logging a critical error.
-      // if this thread is not running we cannot handle commands from SCM.
-      LOG.error("Critical Error : Pool replica thread encountered an " +
-          "error. Thread: {} Error Count : {}", t.toString(), e,
-          threadFaultCount.incrementAndGet());
-      poolProcessThread.start();
-      // TODO : Add a config to restrict how many times we will restart this
-      // thread in a single session.
-    });
-    poolProcessThread.start();
-  }
-
-  /**
-   * Adds a container report to appropriate inProgress Pool.
-   * @param containerReport  -- Container report for a specific container from
-   * a datanode.
-   */
-  public void handleContainerReport(
-      ContainerReportsRequestProto containerReport) {
-    String poolName = null;
-    DatanodeID datanodeID = DatanodeID
-        .getFromProtoBuf(containerReport.getDatanodeID());
-    try {
-      poolName = poolManager.getNodePool(datanodeID);
-    } catch (SCMException e) {
-      LOG.warn("Skipping processing container report from datanode {}, "
-              + "cause: failed to get the corresponding node pool",
-          datanodeID.toString(), e);
-      return;
-    }
-
-    for(InProgressPool ppool : inProgressPoolList) {
-      if(ppool.getPoolName().equalsIgnoreCase(poolName)) {
-        ppool.handleContainerReport(containerReport);
-        return;
-      }
-    }
-    // TODO: Decide if we can do anything else with this report.
-    LOG.debug("Discarding the container report for pool {}. That pool is not " 
+
-        "currently in the pool reconciliation process. Container Name: {}",
-        poolName, containerReport.getDatanodeID());
-  }
-
-  /**
-   * Get in process pool list, used for testing.
-   * @return List of InProgressPool
-   */
-  @VisibleForTesting
-  public List<InProgressPool> getInProcessPoolList() {
-    return inProgressPoolList;
-  }
-
-  /**
-   * Shutdown the Container Replication Manager.
-   * @throws IOException if an I/O error occurs
-   */
-  @Override
-  public void close() throws IOException {
-    setExit();
-    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java
new file mode 100644
index 0000000..c063b8b
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT;
+
+/**
+ * This class takes a set of container reports that belong to a pool and then
+ * computes the replication levels for each container.
+ */
+public class ContainerSupervisor implements Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerSupervisor.class);
+
+  private final NodePoolManager poolManager;
+  private final HashSet<String> poolNames;
+  private final PriorityQueue<PeriodicPool> poolQueue;
+  private final NodeManager nodeManager;
+  private final long containerProcessingLag;
+  private final AtomicBoolean runnable;
+  private final ExecutorService executorService;
+  private final long maxPoolWait;
+  private long poolProcessCount;
+  private final List<InProgressPool> inProgressPoolList;
+  private final AtomicInteger threadFaultCount;
+  private final int inProgressPoolMaxCount;
+
+  private final ReadWriteLock inProgressPoolListLock;
+
+  /**
+   * Returns the number of times we have processed pools.
+   * @return long
+   */
+  public long getPoolProcessCount() {
+    return poolProcessCount;
+  }
+
+
+  /**
+   * Constructs a class that computes Replication Levels.
+   *
+   * @param conf - OzoneConfiguration
+   * @param nodeManager - Node Manager
+   * @param poolManager - Pool Manager
+   */
+  public ContainerSupervisor(Configuration conf, NodeManager nodeManager,
+                             NodePoolManager poolManager) {
+    Preconditions.checkNotNull(poolManager);
+    Preconditions.checkNotNull(nodeManager);
+    this.containerProcessingLag =
+        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL,
+            OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT,
+            TimeUnit.SECONDS
+        ) * 1000;
+    int maxContainerReportThreads =
+        conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
+            OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
+        );
+    this.maxPoolWait =
+        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT,
+            OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    this.inProgressPoolMaxCount = conf.getInt(
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS,
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT);
+    this.poolManager = poolManager;
+    this.nodeManager = nodeManager;
+    this.poolNames = new HashSet<>();
+    this.poolQueue = new PriorityQueue<>();
+    this.runnable = new AtomicBoolean(true);
+    this.threadFaultCount = new AtomicInteger(0);
+    this.executorService = HadoopExecutors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Container Reports Processing Thread - %d")
+            .build(), maxContainerReportThreads);
+    this.inProgressPoolList = new LinkedList<>();
+    this.inProgressPoolListLock = new ReentrantReadWriteLock();
+
+    initPoolProcessThread();
+  }
+
+  /**
+   * Returns the number of pools that are under process right now.
+   * @return  int - Number of pools that are in process.
+   */
+  public int getInProgressPoolCount() {
+    return inProgressPoolList.size();
+  }
+
+  /**
+   * Exits the background thread.
+   */
+  public void setExit() {
+    this.runnable.set(false);
+  }
+
+  /**
+   * Adds or removes pools from names that we need to process.
+   *
+   * There are two different cases that we need to process.
+   * The case where some pools are being added and some times we have to
+   * handle cases where pools are removed.
+   */
+  private void refreshPools() {
+    List<String> pools = this.poolManager.getNodePools();
+    if (pools != null) {
+
+      HashSet<String> removedPools =
+          computePoolDifference(this.poolNames, new HashSet<>(pools));
+
+      HashSet<String> addedPools =
+          computePoolDifference(new HashSet<>(pools), this.poolNames);
+      // TODO: Support remove pool API in pool manager so that this code
+      // path can be tested. This never happens in the current code base.
+      for (String poolName : removedPools) {
+        for (PeriodicPool periodicPool : poolQueue) {
+          if (periodicPool.getPoolName().compareTo(poolName) == 0) {
+            poolQueue.remove(periodicPool);
+          }
+        }
+      }
+      // Remove the pool names that we have in the list.
+      this.poolNames.removeAll(removedPools);
+
+      for (String poolName : addedPools) {
+        poolQueue.add(new PeriodicPool(poolName));
+      }
+
+      // Add to the pool names we are tracking.
+      poolNames.addAll(addedPools);
+    }
+
+  }
+
+  /**
+   * Handle the case where pools are added.
+   *
+   * @param newPools - New Pools list
+   * @param oldPool - oldPool List.
+   */
+  private HashSet<String> computePoolDifference(HashSet<String> newPools,
+      Set<String> oldPool) {
+    Preconditions.checkNotNull(newPools);
+    Preconditions.checkNotNull(oldPool);
+    HashSet<String> newSet = new HashSet<>(newPools);
+    newSet.removeAll(oldPool);
+    return newSet;
+  }
+
+  private void initPoolProcessThread() {
+
+    /*
+     * Task that runs to check if we need to start a pool processing job.
+     * if so we create a pool reconciliation job and find out of all the
+     * expected containers are on the nodes.
+     */
+    Runnable processPools = () -> {
+      while (runnable.get()) {
+        // Make sure that we don't have any new pools.
+        refreshPools();
+        while (inProgressPoolList.size() < inProgressPoolMaxCount) {
+          PeriodicPool pool = poolQueue.poll();
+          if (pool != null) {
+            if (pool.getLastProcessedTime() + this.containerProcessingLag >
+                Time.monotonicNow()) {
+              LOG.debug("Not within the time window for processing: {}",
+                  pool.getPoolName());
+              // we might over sleep here, not a big deal.
+              sleepUninterruptibly(this.containerProcessingLag,
+                  TimeUnit.MILLISECONDS);
+            }
+            LOG.debug("Adding pool {} to container processing queue",
+                pool.getPoolName());
+            InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
+                pool, this.nodeManager, this.poolManager, 
this.executorService);
+            inProgressPool.startReconciliation();
+            inProgressPoolListLock.writeLock().lock();
+            try {
+              inProgressPoolList.add(inProgressPool);
+            } finally {
+              inProgressPoolListLock.writeLock().unlock();
+            }
+            poolProcessCount++;
+          } else {
+            break;
+          }
+        }
+        sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
+        inProgressPoolListLock.readLock().lock();
+        try {
+          for (InProgressPool inProgressPool : inProgressPoolList) {
+            inProgressPool.finalizeReconciliation();
+            poolQueue.add(inProgressPool.getPool());
+          }
+        } finally {
+          inProgressPoolListLock.readLock().unlock();
+        }
+        inProgressPoolListLock.writeLock().lock();
+        try {
+          inProgressPoolList.clear();
+        } finally {
+          inProgressPoolListLock.writeLock().unlock();
+        }
+      }
+    };
+
+    // We will have only one thread for pool processing.
+    Thread poolProcessThread = new Thread(processPools);
+    poolProcessThread.setDaemon(true);
+    poolProcessThread.setName("Pool replica thread");
+    poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+      // Let us just restart this thread after logging a critical error.
+      // if this thread is not running we cannot handle commands from SCM.
+      LOG.error("Critical Error : Pool replica thread encountered an " +
+          "error. Thread: {} Error Count : {}", t.toString(), e,
+          threadFaultCount.incrementAndGet());
+      poolProcessThread.start();
+      // TODO : Add a config to restrict how many times we will restart this
+      // thread in a single session.
+    });
+    poolProcessThread.start();
+  }
+
+  /**
+   * Adds a container report to appropriate inProgress Pool.
+   * @param containerReport  -- Container report for a specific container from
+   * a datanode.
+   */
+  public void handleContainerReport(
+      ContainerReportsRequestProto containerReport) {
+    DatanodeID datanodeID = DatanodeID.getFromProtoBuf(
+        containerReport.getDatanodeID());
+    inProgressPoolListLock.readLock().lock();
+    try {
+      String poolName = poolManager.getNodePool(datanodeID);
+      for (InProgressPool ppool : inProgressPoolList) {
+        if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
+          ppool.handleContainerReport(containerReport);
+          return;
+        }
+      }
+      // TODO: Decide if we can do anything else with this report.
+      LOG.debug("Discarding the container report for pool {}. " +
+              "That pool is not currently in the pool reconciliation process." 
+
+              " Container Name: {}", poolName, 
containerReport.getDatanodeID());
+    } catch (SCMException e) {
+      LOG.warn("Skipping processing container report from datanode {}, "
+              + "cause: failed to get the corresponding node pool",
+          datanodeID.toString(), e);
+    } finally {
+      inProgressPoolListLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get in process pool list, used for testing.
+   * @return List of InProgressPool
+   */
+  @VisibleForTesting
+  public List<InProgressPool> getInProcessPoolList() {
+    return inProgressPoolList;
+  }
+
+  /**
+   * Shutdown the Container Replication Manager.
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    setExit();
+    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
index 24423a3..833d1a8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
@@ -21,9 +21,10 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.NodePoolManager;
 import org.apache.hadoop.util.Time;
@@ -39,10 +40,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import static 
org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
-import static 
org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE;
-import static 
org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN;
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
+    .NodeState.HEALTHY;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
+    .NodeState.STALE;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
+    .NodeState.UNKNOWN;
 
 /**
  * These are pools that are actively checking for replication status of the
@@ -51,8 +56,8 @@ import static 
org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNO
 public final class InProgressPool {
   public static final Logger LOG =
       LoggerFactory.getLogger(InProgressPool.class);
+
   private final PeriodicPool pool;
-  private final CommandQueue commandQueue;
   private final NodeManager nodeManager;
   private final NodePoolManager poolManager;
   private final ExecutorService executorService;
@@ -70,22 +75,19 @@ public final class InProgressPool {
    * @param pool - Pool that we are working against
    * @param nodeManager - Nodemanager
    * @param poolManager - pool manager
-   * @param commandQueue - Command queue
    * @param executorService - Shared Executor service.
    */
   InProgressPool(long maxWaitTime, PeriodicPool pool,
       NodeManager nodeManager, NodePoolManager poolManager,
-      CommandQueue commandQueue, ExecutorService executorService) {
+                 ExecutorService executorService) {
     Preconditions.checkNotNull(pool);
     Preconditions.checkNotNull(nodeManager);
     Preconditions.checkNotNull(poolManager);
-    Preconditions.checkNotNull(commandQueue);
     Preconditions.checkNotNull(executorService);
     Preconditions.checkArgument(maxWaitTime > 0);
     this.pool = pool;
     this.nodeManager = nodeManager;
     this.poolManager = poolManager;
-    this.commandQueue = commandQueue;
     this.executorService = executorService;
     this.containerCountMap = new ConcurrentHashMap<>();
     this.processedNodeSet = new ConcurrentHashMap<>();
@@ -186,7 +188,7 @@ public final class InProgressPool {
         // Queue commands to all datanodes in this pool to send us container
         // report. Since we ignore dead nodes, it is possible that we would 
have
         // over replicated the container if the node comes back.
-        commandQueue.addCommand(id, cmd);
+        nodeManager.addDatanodeCommand(id, cmd);
       }
     }
     this.status = ProgressStatus.InProgress;
@@ -235,7 +237,12 @@ public final class InProgressPool {
    */
   public void handleContainerReport(
       ContainerReportsRequestProto containerReport) {
-    executorService.submit(processContainerReport(containerReport));
+    if (status == ProgressStatus.InProgress) {
+      executorService.submit(processContainerReport(containerReport));
+    } else {
+      LOG.debug("Cannot handle container report when the pool is in {} 
status.",
+          status);
+    }
   }
 
   private Runnable processContainerReport(
@@ -292,6 +299,11 @@ public final class InProgressPool {
     return pool.getPoolName();
   }
 
+  public void finalizeReconciliation() {
+    status = ProgressStatus.Done;
+    //TODO: Add finalizing logic. This is where actual reconciliation happens.
+  }
+
   /**
    * Current status of the computing replication status.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
index 266428b..ce032b4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
@@ -123,6 +123,12 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
   SCMNodeMetric getNodeStat(DatanodeID datanodeID);
 
   /**
+   * Returns the NodePoolManager associated with the NodeManager.
+   * @return NodePoolManager
+   */
+  NodePoolManager getNodePoolManager();
+
+  /**
    * Wait for the heartbeat is processed by NodeManager.
    * @return true if heartbeat has been processed.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index ed894cb..129e65d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -858,6 +858,11 @@ public class SCMNodeManager
   }
 
   @Override
+  public NodePoolManager getNodePoolManager() {
+    return nodePoolManager;
+  }
+
+  @Override
   public Map<String, Integer> getNodeCount() {
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
     for(NodeState state : NodeState.values()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
index 94f2a17..d9bf587 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
@@ -29,26 +29,32 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.mockito.Mockito;
 
 /**
  * A Node Manager to test replication.
  */
 public class ReplicationNodeManagerMock implements NodeManager {
   private final Map<DatanodeID, NodeState> nodeStateMap;
+  private final CommandQueue commandQueue;
 
   /**
    * A list of Datanodes and current states.
    * @param nodeState A node state map.
    */
-  public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState) {
+  public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState,
+                                    CommandQueue commandQueue) {
     Preconditions.checkNotNull(nodeState);
-    nodeStateMap = nodeState;
+    this.nodeStateMap = nodeState;
+    this.commandQueue = commandQueue;
   }
 
   /**
@@ -194,6 +200,11 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
     return null;
   }
 
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return Mockito.mock(NodePoolManager.class);
+  }
+
   /**
    * Wait for the heartbeat is processed by NodeManager.
    *
@@ -304,4 +315,9 @@ public class ReplicationNodeManagerMock implements 
NodeManager {
     nodeStateMap.put(id, state);
   }
 
+  @Override
+  public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
+    this.commandQueue.addCommand(id, command);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
deleted file mode 100644
index 0e36339..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.container.TestUtils
-    .ReplicationDatanodeStateManager;
-import org.apache.hadoop.ozone.container.TestUtils.ReplicationNodeManagerMock;
-import org.apache.hadoop.ozone.container.TestUtils
-    .ReplicationNodePoolManagerMock;
-import org.apache.hadoop.ozone.container.common.SCMTestUtils;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import org.apache.hadoop.ozone.scm.container.replication
-    .ContainerReplicationManager;
-import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
-import org.apache.hadoop.ozone.scm.node.CommandQueue;
-import org.apache.hadoop.ozone.scm.node.NodeManager;
-import org.apache.hadoop.ozone.scm.node.NodePoolManager;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
-import org.slf4j.event.Level;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static 
org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
-import static org.apache.ratis.shaded.com.google.common.util.concurrent
-    .Uninterruptibles.sleepUninterruptibly;
-
-/**
- * Tests for the container manager.
- */
-public class TestContainerReplicationManager {
-  final static String POOL_NAME_TEMPLATE = "Pool%d";
-  static final int MAX_DATANODES = 72;
-  static final int POOL_SIZE = 24;
-  static final int POOL_COUNT = 3;
-  private LogCapturer logCapturer = LogCapturer.captureLogs(
-      LogFactory.getLog(ContainerReplicationManager.class));
-  private List<DatanodeID> datanodes = new LinkedList<>();
-  private NodeManager nodeManager;
-  private NodePoolManager poolManager;
-  private CommandQueue commandQueue;
-  private ContainerReplicationManager replicationManager;
-  private ReplicationDatanodeStateManager datanodeStateManager;
-
-  @After
-  public void tearDown() throws Exception {
-    logCapturer.stopCapturing();
-    GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.INFO);
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.DEBUG);
-    Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>();
-    // We are setting up 3 pools with 24 nodes each in this cluster.
-    // First we create 72 Datanodes.
-    for (int x = 0; x < MAX_DATANODES; x++) {
-      DatanodeID datanode = SCMTestUtils.getDatanodeID();
-      datanodes.add(datanode);
-      nodeStateMap.put(datanode, HEALTHY);
-    }
-
-    // All nodes in this cluster are healthy for time being.
-    nodeManager = new ReplicationNodeManagerMock(nodeStateMap);
-    poolManager = new ReplicationNodePoolManagerMock();
-    commandQueue = new CommandQueue();
-
-    Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
-        "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
-
-    // Start from 1 instead of zero so we can multiply and get the node index.
-    for (int y = 1; y <= POOL_COUNT; y++) {
-      String poolName = String.format(POOL_NAME_TEMPLATE, y);
-      for (int z = 0; z < POOL_SIZE; z++) {
-        DatanodeID id = datanodes.get(y * z);
-        poolManager.addNode(poolName, id);
-      }
-    }
-    OzoneConfiguration config = SCMTestUtils.getOzoneConf();
-    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 1,
-        TimeUnit.SECONDS);
-    replicationManager = new ContainerReplicationManager(config,
-        nodeManager, poolManager, commandQueue);
-    datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
-        poolManager);
-    // Sleep for one second to make sure all threads get time to run.
-    sleepUninterruptibly(1, TimeUnit.SECONDS);
-  }
-
-  @Test
-  /**
-   * Asserts that at least one pool is picked up for processing.
-   */
-  public void testAssertPoolsAreProcessed() {
-    // This asserts that replication manager has started processing at least
-    // one pool.
-    Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0);
-
-    // Since all datanodes are flagged as healthy in this test, for each
-    // datanode we must have queued a command.
-    Assert.assertEquals("Commands are in queue :", commandQueue
-        .getCommandsInQueue(), POOL_SIZE * replicationManager
-        .getInProgressPoolCount());
-  }
-
-  @Test
-  /**
-   * This test sends container reports for 2 containers to a pool in progress.
-   * Asserts that we are able to find a container with single replica and do
-   * not find container with 3 replicas.
-   */
-  public void testDetectSingleContainerReplica() throws TimeoutException,
-      InterruptedException {
-    String singleNodeContainer = "SingleNodeContainer";
-    String threeNodeContainer = "ThreeNodeContainer";
-    InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
-    // Only single datanode reporting that "SingleNodeContainer" exists.
-    List<ContainerReportsRequestProto> clist =
-        datanodeStateManager.getContainerReport(singleNodeContainer,
-            ppool.getPool().getPoolName(), 1);
-    ppool.handleContainerReport(clist.get(0));
-
-    // Three nodes are going to report that ThreeNodeContainer  exists.
-    clist = datanodeStateManager.getContainerReport(threeNodeContainer,
-        ppool.getPool().getPoolName(), 3);
-
-    for (ContainerReportsRequestProto reportsProto : clist) {
-      ppool.handleContainerReport(reportsProto);
-    }
-    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
-        200, 1000);
-    ppool.setDoneProcessing();
-
-    List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
-        .getValue() == 1);
-    Assert.assertEquals(singleNodeContainer, containers.get(0).getKey());
-    int count = containers.get(0).getValue();
-    Assert.assertEquals(1L, count);
-  }
-
-  @Test
-  /**
-   * We create three containers, Normal,OveReplicated and WayOverReplicated
-   * containers. This test asserts that we are able to find the
-   * over replicated containers.
-   */
-  public void testDetectOverReplica() throws TimeoutException,
-      InterruptedException {
-    String normalContainer = "NormalContainer";
-    String overReplicated = "OverReplicatedContainer";
-    String wayOverReplicated = "WayOverReplicated";
-    InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
-
-    List<ContainerReportsRequestProto> clist =
-        datanodeStateManager.getContainerReport(normalContainer,
-            ppool.getPool().getPoolName(), 3);
-    ppool.handleContainerReport(clist.get(0));
-
-    clist = datanodeStateManager.getContainerReport(overReplicated,
-        ppool.getPool().getPoolName(), 4);
-
-    for (ContainerReportsRequestProto reportsProto : clist) {
-      ppool.handleContainerReport(reportsProto);
-    }
-
-    clist = datanodeStateManager.getContainerReport(wayOverReplicated,
-        ppool.getPool().getPoolName(), 7);
-
-    for (ContainerReportsRequestProto reportsProto : clist) {
-      ppool.handleContainerReport(reportsProto);
-    }
-
-    // We ignore container reports from the same datanodes.
-    // it is possible that these each of these containers get placed
-    // on same datanodes, so allowing for 4 duplicates in the set of 14.
-    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10,
-        200, 1000);
-    ppool.setDoneProcessing();
-
-    List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
-        .getValue() > 3);
-    Assert.assertEquals(2, containers.size());
-  }
-
-  @Test
-  /**
-   * This test verifies that all pools are picked up for replica processing.
-   *
-   */
-  public void testAllPoolsAreProcessed() throws TimeoutException,
-      InterruptedException {
-    // Verify that we saw all three pools being picked up for processing.
-    GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount()
-        >= 3, 200, 15 * 1000);
-    Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
-        logCapturer.getOutput().contains("Pool2") &&
-        logCapturer.getOutput().contains("Pool3"));
-  }
-
-  @Test
-  /**
-   * Adds a new pool and tests that we are able to pick up that new pool for
-   * processing as well as handle container reports for datanodes in that pool.
-   * @throws TimeoutException
-   * @throws InterruptedException
-   */
-  public void testAddingNewPoolWorks()
-      throws TimeoutException, InterruptedException, IOException {
-    LogCapturer inProgressLog = LogCapturer.captureLogs(
-        LogFactory.getLog(InProgressPool.class));
-    GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG);
-    try {
-      DatanodeID id = SCMTestUtils.getDatanodeID();
-      ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY);
-      poolManager.addNode("PoolNew", id);
-      GenericTestUtils.waitFor(() ->
-              logCapturer.getOutput().contains("PoolNew"),
-          200, 15 * 1000);
-
-      // Assert that we are able to send a container report to this new
-      // pool and datanode.
-      List<ContainerReportsRequestProto> clist =
-          datanodeStateManager.getContainerReport("NewContainer1",
-              "PoolNew", 1);
-      replicationManager.handleContainerReport(clist.get(0));
-      GenericTestUtils.waitFor(() ->
-          inProgressLog.getOutput().contains("NewContainer1") && inProgressLog
-              .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000);
-    } finally {
-      inProgressLog.stopCapturing();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
new file mode 100644
index 0000000..8927596
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.TestUtils
+    .ReplicationDatanodeStateManager;
+import org.apache.hadoop.ozone.container.TestUtils.ReplicationNodeManagerMock;
+import org.apache.hadoop.ozone.container.TestUtils
+    .ReplicationNodePoolManagerMock;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor;
+import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.slf4j.event.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
+import static org.apache.ratis.shaded.com.google.common.util.concurrent
+    .Uninterruptibles.sleepUninterruptibly;
+
+/**
+ * Tests for the container manager.
+ */
+public class TestContainerSupervisor {
+  final static String POOL_NAME_TEMPLATE = "Pool%d";
+  static final int MAX_DATANODES = 72;
+  static final int POOL_SIZE = 24;
+  static final int POOL_COUNT = 3;
+  private LogCapturer logCapturer = LogCapturer.captureLogs(
+      LogFactory.getLog(ContainerSupervisor.class));
+  private List<DatanodeID> datanodes = new LinkedList<>();
+  private NodeManager nodeManager;
+  private NodePoolManager poolManager;
+  private CommandQueue commandQueue;
+  private ContainerSupervisor containerSupervisor;
+  private ReplicationDatanodeStateManager datanodeStateManager;
+
+  @After
+  public void tearDown() throws Exception {
+    logCapturer.stopCapturing();
+    GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG);
+    Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>();
+    // We are setting up 3 pools with 24 nodes each in this cluster.
+    // First we create 72 Datanodes.
+    for (int x = 0; x < MAX_DATANODES; x++) {
+      DatanodeID datanode = SCMTestUtils.getDatanodeID();
+      datanodes.add(datanode);
+      nodeStateMap.put(datanode, HEALTHY);
+    }
+
+    commandQueue = new CommandQueue();
+
+    // All nodes in this cluster are healthy for time being.
+    nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue);
+    poolManager = new ReplicationNodePoolManagerMock();
+
+
+    Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
+        "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
+
+    // Start from 1 instead of zero so we can multiply and get the node index.
+    for (int y = 1; y <= POOL_COUNT; y++) {
+      String poolName = String.format(POOL_NAME_TEMPLATE, y);
+      for (int z = 0; z < POOL_SIZE; z++) {
+        DatanodeID id = datanodes.get(y * z);
+        poolManager.addNode(poolName, id);
+      }
+    }
+    OzoneConfiguration config = SCMTestUtils.getOzoneConf();
+    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2,
+        TimeUnit.SECONDS);
+    config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    containerSupervisor = new ContainerSupervisor(config,
+        nodeManager, poolManager);
+    datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
+        poolManager);
+    // Sleep for one second to make sure all threads get time to run.
+    sleepUninterruptibly(1, TimeUnit.SECONDS);
+  }
+
+  @Test
+  /**
+   * Asserts that at least one pool is picked up for processing.
+   */
+  public void testAssertPoolsAreProcessed() {
+    // This asserts that replication manager has started processing at least
+    // one pool.
+    Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0);
+
+    // Since all datanodes are flagged as healthy in this test, for each
+    // datanode we must have queued a command.
+    Assert.assertEquals("Commands are in queue :",
+        POOL_SIZE * containerSupervisor.getInProgressPoolCount(),
+        commandQueue.getCommandsInQueue());
+  }
+
+  @Test
+  /**
+   * This test sends container reports for 2 containers to a pool in progress.
+   * Asserts that we are able to find a container with single replica and do
+   * not find container with 3 replicas.
+   */
+  public void testDetectSingleContainerReplica() throws TimeoutException,
+      InterruptedException {
+    String singleNodeContainer = "SingleNodeContainer";
+    String threeNodeContainer = "ThreeNodeContainer";
+    InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
+    // Only single datanode reporting that "SingleNodeContainer" exists.
+    List<ContainerReportsRequestProto> clist =
+        datanodeStateManager.getContainerReport(singleNodeContainer,
+            ppool.getPool().getPoolName(), 1);
+    ppool.handleContainerReport(clist.get(0));
+
+    // Three nodes are going to report that ThreeNodeContainer  exists.
+    clist = datanodeStateManager.getContainerReport(threeNodeContainer,
+        ppool.getPool().getPoolName(), 3);
+
+    for (ContainerReportsRequestProto reportsProto : clist) {
+      ppool.handleContainerReport(reportsProto);
+    }
+    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
+        200, 1000);
+    ppool.setDoneProcessing();
+
+    List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
+        .getValue() == 1);
+    Assert.assertEquals(singleNodeContainer, containers.get(0).getKey());
+    int count = containers.get(0).getValue();
+    Assert.assertEquals(1L, count);
+  }
+
+  @Test
+  /**
+   * We create three containers, Normal,OveReplicated and WayOverReplicated
+   * containers. This test asserts that we are able to find the
+   * over replicated containers.
+   */
+  public void testDetectOverReplica() throws TimeoutException,
+      InterruptedException {
+    String normalContainer = "NormalContainer";
+    String overReplicated = "OverReplicatedContainer";
+    String wayOverReplicated = "WayOverReplicated";
+    InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0);
+
+    List<ContainerReportsRequestProto> clist =
+        datanodeStateManager.getContainerReport(normalContainer,
+            ppool.getPool().getPoolName(), 3);
+    ppool.handleContainerReport(clist.get(0));
+
+    clist = datanodeStateManager.getContainerReport(overReplicated,
+        ppool.getPool().getPoolName(), 4);
+
+    for (ContainerReportsRequestProto reportsProto : clist) {
+      ppool.handleContainerReport(reportsProto);
+    }
+
+    clist = datanodeStateManager.getContainerReport(wayOverReplicated,
+        ppool.getPool().getPoolName(), 7);
+
+    for (ContainerReportsRequestProto reportsProto : clist) {
+      ppool.handleContainerReport(reportsProto);
+    }
+
+    // We ignore container reports from the same datanodes.
+    // it is possible that these each of these containers get placed
+    // on same datanodes, so allowing for 4 duplicates in the set of 14.
+    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10,
+        200, 1000);
+    ppool.setDoneProcessing();
+
+    List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
+        .getValue() > 3);
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  /**
+   * This test verifies that all pools are picked up for replica processing.
+   *
+   */
+  public void testAllPoolsAreProcessed() throws TimeoutException,
+      InterruptedException {
+    // Verify that we saw all three pools being picked up for processing.
+    GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount()
+        >= 3, 200, 15 * 1000);
+    Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
+        logCapturer.getOutput().contains("Pool2") &&
+        logCapturer.getOutput().contains("Pool3"));
+  }
+
+  @Test
+  /**
+   * Adds a new pool and tests that we are able to pick up that new pool for
+   * processing as well as handle container reports for datanodes in that pool.
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  public void testAddingNewPoolWorks()
+      throws TimeoutException, InterruptedException, IOException {
+    LogCapturer inProgressLog = LogCapturer.captureLogs(
+        LogFactory.getLog(InProgressPool.class));
+    GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG);
+    try {
+      DatanodeID id = SCMTestUtils.getDatanodeID();
+      ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY);
+      poolManager.addNode("PoolNew", id);
+      GenericTestUtils.waitFor(() ->
+              logCapturer.getOutput().contains("PoolNew"),
+          200, 15 * 1000);
+
+      // Assert that we are able to send a container report to this new
+      // pool and datanode.
+      List<ContainerReportsRequestProto> clist =
+          datanodeStateManager.getContainerReport("NewContainer1",
+              "PoolNew", 1);
+      containerSupervisor.handleContainerReport(clist.get(0));
+      GenericTestUtils.waitFor(() ->
+          inProgressLog.getOutput().contains("NewContainer1") && inProgressLog
+              .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000);
+    } finally {
+      inProgressLog.stopCapturing();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
index 270929e..1314126 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -269,6 +271,11 @@ public class MockNodeManager implements NodeManager {
     return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString()));
   }
 
+  @Override
+  public NodePoolManager getNodePoolManager() {
+    return Mockito.mock(NodePoolManager.class);
+  }
+
   /**
    * Used for testing.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
index ac8dee9..56085e7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
@@ -203,8 +203,7 @@ public class TestContainerMapping {
   }
 
   @Test
-  public void testFullContainerReport() throws IOException,
-      InterruptedException {
+  public void testFullContainerReport() throws IOException {
     String containerName = UUID.randomUUID().toString();
     ContainerInfo info = createContainer(containerName);
     DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
@@ -227,7 +226,13 @@ public class TestContainerMapping {
         .setContainerID(info.getContainerID());
 
     reports.add(ciBuilder.build());
-    mapping.processContainerReports(datanodeID, reportType, reports);
+
+    ContainerReportsRequestProto.Builder crBuilder =
+        ContainerReportsRequestProto.newBuilder();
+    crBuilder.setDatanodeID(datanodeID.getProtoBufMessage())
+        .setType(reportType).addAllReports(reports);
+
+    mapping.processContainerReports(crBuilder.build());
 
     ContainerInfo updatedContainer = mapping.getContainer(containerName);
     Assert.assertEquals(100000000L, updatedContainer.getNumberOfKeys());
@@ -260,7 +265,12 @@ public class TestContainerMapping {
 
     reports.add(ciBuilder.build());
 
-    mapping.processContainerReports(datanodeID, reportType, reports);
+    ContainerReportsRequestProto.Builder crBuilder =
+        ContainerReportsRequestProto.newBuilder();
+    crBuilder.setDatanodeID(datanodeID.getProtoBufMessage())
+        .setType(reportType).addAllReports(reports);
+
+    mapping.processContainerReports(crBuilder.build());
 
     ContainerInfo updatedContainer = mapping.getContainer(containerName);
     Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to