HDDS-737. Introduce Incremental Container Report.
Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c80f753b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c80f753b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c80f753b

Branch: refs/heads/trunk
Commit: c80f753b0e95eb722a972f836c1e4d16fd823434
Parents: e1bbf7d
Author: Nanda kumar <na...@apache.org>
Authored: Thu Nov 8 18:33:38 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Thu Nov 8 18:33:38 2018 +0530

----------------------------------------------------------------------
 .../hdds/scm/container/ContainerInfo.java       |   8 +
 .../statemachine/DatanodeStateMachine.java      |  17 +-
 .../common/statemachine/StateContext.java       |  13 +-
 .../CloseContainerCommandHandler.java           |  73 +++--
 .../states/endpoint/HeartbeatEndpointTask.java  |  12 +-
 .../StorageContainerDatanodeProtocol.proto      |   2 +-
 .../scm/command/CommandStatusReportHandler.java |   4 -
 .../container/CloseContainerEventHandler.java   |  27 --
 .../scm/container/CloseContainerWatcher.java    | 101 -------
 .../hdds/scm/container/ContainerManager.java    |  11 -
 .../hdds/scm/container/ContainerReplica.java    |  30 +-
 .../scm/container/ContainerReportHandler.java   | 202 +++++++++----
 .../IncrementalContainerReportHandler.java      |  98 +++++++
 .../hdds/scm/container/SCMContainerManager.java | 107 -------
 .../scm/container/states/ContainerStateMap.java |   7 -
 .../hadoop/hdds/scm/events/SCMEvents.java       |  34 +--
 .../hadoop/hdds/scm/node/DeadNodeHandler.java   |  37 ++-
 .../hadoop/hdds/scm/node/NewNodeHandler.java    |  16 +-
 .../hadoop/hdds/scm/node/NodeManager.java       |  50 +---
 .../hadoop/hdds/scm/node/NodeReportHandler.java |   2 +-
 .../hadoop/hdds/scm/node/NodeStateManager.java  |  69 +----
 .../hadoop/hdds/scm/node/SCMNodeManager.java    |  66 +----
 .../hadoop/hdds/scm/node/StaleNodeHandler.java  |   2 +-
 .../hdds/scm/node/states/NodeStateMap.java      |  67 ++---
 .../server/SCMDatanodeHeartbeatDispatcher.java  |  17 +-
 .../scm/server/SCMDatanodeProtocolServer.java   |   6 +-
 .../scm/server/StorageContainerManager.java     |  21 +-
 .../command/TestCommandStatusReportHandler.java |  14 -
 .../hdds/scm/container/MockNodeManager.java     |  66 ++---
 .../container/TestContainerReportHandler.java   |  15 +-
 .../container/TestContainerStateManager.java    |   3 +
 .../scm/container/TestSCMContainerManager.java  | 107 +------
 .../replication/TestReplicationManager.java     |  12 +-
 .../hdds/scm/node/TestDeadNodeHandler.java      |  24 +-
 .../container/TestCloseContainerWatcher.java    | 289 -------------------
 .../ozone/container/common/TestEndPoint.java    |   6 +-
 .../testutils/ReplicationNodeManagerMock.java   |  55 +---
 .../TestContainerStateManagerIntegration.java   |  11 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |   4 +-
 .../hdds/scm/pipeline/TestPipelineClose.java    |   4 +-
 .../commandhandler/TestBlockDeletion.java       |   6 +-
 41 files changed, 558 insertions(+), 1157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 5a9484a..edfa0f9 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@ -158,10 +158,18 @@ public class ContainerInfo implements 
Comparator<ContainerInfo>,
     return usedBytes;
   }
 
+  public void setUsedBytes(long value) {
+    usedBytes = value;
+  }
+
   public long getNumberOfKeys() {
     return numberOfKeys;
   }
 
+  public void setNumberOfKeys(long value) {
+    numberOfKeys = value;
+  }
+
   public long getDeleteTransactionId() {
     return deleteTransactionId;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 4768cf8..12c33ff 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -168,10 +168,14 @@ public class DatanodeStateMachine implements Closeable {
             TimeUnit.MILLISECONDS);
         now = Time.monotonicNow();
         if (now < nextHB.get()) {
-          Thread.sleep(nextHB.get() - now);
+          if(!Thread.interrupted()) {
+            Thread.sleep(nextHB.get() - now);
+          }
         }
       } catch (InterruptedException e) {
-        // Ignore this exception.
+        // Some one has sent interrupt signal, this could be because
+        // 1. Trigger heartbeat immediately
+        // 2. Shutdown has be initiated.
       } catch (Exception e) {
         LOG.error("Unable to finish the execution.", e);
       }
@@ -325,6 +329,15 @@ public class DatanodeStateMachine implements Closeable {
   }
 
   /**
+   * Calling this will immediately trigger a heartbeat to the SCMs.
+   * This heartbeat will also include all the reports which are ready to
+   * be sent by datanode.
+   */
+  public void triggerHeartbeat() {
+    stateMachineThread.interrupt();
+  }
+
+  /**
    * Waits for DatanodeStateMachine to exit.
    *
    * @throws InterruptedException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 12c196b..e928824 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -447,6 +447,11 @@ public class StateContext {
    * @param cmd - {@link SCMCommand}.
    */
   public void addCmdStatus(SCMCommand cmd) {
+    if (cmd.getType().equals(Type.closeContainerCommand)) {
+      // We will be removing CommandStatus completely.
+      // As a first step, removed it for CloseContainerCommand.
+      return;
+    }
     CommandStatusBuilder statusBuilder;
     if (cmd.getType() == Type.deleteBlocksCommand) {
       statusBuilder = new DeleteBlockCommandStatusBuilder();
@@ -469,14 +474,6 @@ public class StateContext {
   }
 
   /**
-   * Remove object from cache in StateContext#cmdStatusMap.
-   *
-   */
-  public void removeCommandStatus(Long cmdId) {
-    cmdStatusMap.remove(cmdId);
-  }
-
-  /**
    * Updates status of a pending status command.
    * @param cmdId       command id
    * @param cmdStatusUpdater Consumer to update command status.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 2c3db61..591fe56 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -19,6 +19,8 @@ package 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
@@ -63,62 +65,55 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
     invocationCount++;
-    cmdExecuted = false;
     long startTime = Time.monotonicNow();
     // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
     long containerID = -1;
     try {
-
-      CloseContainerCommandProto
-          closeContainerProto =
-          CloseContainerCommandProto
-              .parseFrom(command.getProtoBufMessage());
+      CloseContainerCommandProto closeContainerProto =
+          CloseContainerCommandProto.parseFrom(command.getProtoBufMessage());
       containerID = closeContainerProto.getContainerID();
-      if (container.getContainerSet().getContainer(containerID)
+      // CloseContainer operation is idempotent, if the container is already
+      // closed, then do nothing.
+      if (!container.getContainerSet().getContainer(containerID)
           .getContainerData().isClosed()) {
-        LOG.debug("Container {} is already closed", containerID);
-        // It might happen that the where the first attempt of closing the
-        // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will
-        // retry to check the container got really closed via Ratis.
-        // In such cases of the retry attempt, if the container is already
-        // closed via Ratis, we should just return.
-        cmdExecuted = true;
-        return;
-      }
-      HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
-      HddsProtos.ReplicationType replicationType =
-          closeContainerProto.getReplicationType();
+        LOG.debug("Closing container {}.", containerID);
+        HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
+        HddsProtos.ReplicationType replicationType =
+            closeContainerProto.getReplicationType();
 
-      ContainerProtos.ContainerCommandRequestProto.Builder request =
-          ContainerProtos.ContainerCommandRequestProto.newBuilder();
-      request.setCmdType(ContainerProtos.Type.CloseContainer);
-      request.setContainerID(containerID);
-      request.setCloseContainer(
-          ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
-      request.setTraceID(UUID.randomUUID().toString());
-      request.setDatanodeUuid(
-          context.getParent().getDatanodeDetails().getUuidString());
-      // submit the close container request for the XceiverServer to handle
-      container.submitContainerRequest(
-          request.build(), replicationType, pipelineID);
+        ContainerProtos.ContainerCommandRequestProto.Builder request =
+            ContainerProtos.ContainerCommandRequestProto.newBuilder();
+        request.setCmdType(ContainerProtos.Type.CloseContainer);
+        request.setContainerID(containerID);
+        request.setCloseContainer(
+            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+        request.setTraceID(UUID.randomUUID().toString());
+        request.setDatanodeUuid(
+            context.getParent().getDatanodeDetails().getUuidString());
+        // submit the close container request for the XceiverServer to handle
+        container.submitContainerRequest(
+            request.build(), replicationType, pipelineID);
+        // Since the container is closed, we trigger an ICR
+        IncrementalContainerReportProto icr = IncrementalContainerReportProto
+            .newBuilder()
+            .addReport(container.getContainerSet()
+                .getContainer(containerID).getContainerReport())
+            .build();
+        context.addReport(icr);
+        context.getParent().triggerHeartbeat();
+      }
     } catch (Exception e) {
       if (e instanceof NotLeaderException) {
         // If the particular datanode is not the Ratis leader, the close
         // container command will not be executed by the follower but will be
         // executed by Ratis stateMachine transactions via leader to follower.
         // There can also be case where the datanode is in candidate state.
-        // In these situations, NotLeaderException is thrown. Remove the status
-        // from cmdStatus Map here so that it will be retried only by SCM if 
the
-        // leader could not not close the container after a certain time.
-        context.removeCommandStatus(containerID);
-        LOG.info(e.getLocalizedMessage());
+        // In these situations, NotLeaderException is thrown.
+        LOG.info("Follower cannot close the container {}.", containerID);
       } else {
         LOG.error("Can't close container " + containerID, e);
-        cmdExecuted = false;
       }
     } finally {
-      updateCommandStatus(context, command,
-          (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG);
       long endTime = Time.monotonicNow();
       totalTime += endTime - startTime;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 4fd72ec..0c0f1af 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -141,7 +141,9 @@ public class HeartbeatEndpointTask
       rpcEndpoint.zeroMissedCount();
     } catch (IOException ex) {
       // put back the reports which failed to be sent
-      putBackReports(requestBuilder);
+      if (requestBuilder != null) {
+        putBackReports(requestBuilder);
+      }
       rpcEndpoint.logIfNeeded(ex);
     } finally {
       rpcEndpoint.unlock();
@@ -159,10 +161,10 @@ public class HeartbeatEndpointTask
       reports.add(requestBuilder.getNodeReport());
     }
     if (requestBuilder.getCommandStatusReportsCount() != 0) {
-      for (GeneratedMessage msg : requestBuilder
-          .getCommandStatusReportsList()) {
-        reports.add(msg);
-      }
+      reports.addAll(requestBuilder.getCommandStatusReportsList());
+    }
+    if (requestBuilder.getIncrementalContainerReportCount() != 0) {
+      reports.addAll(requestBuilder.getIncrementalContainerReportList());
     }
     context.putBackReports(reports);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 4ddb7b2..33ea307 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -80,7 +80,7 @@ message SCMHeartbeatRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   optional NodeReportProto nodeReport = 2;
   optional ContainerReportsProto containerReport = 3;
-  optional IncrementalContainerReportProto incrementalContainerReport = 4;
+  repeated IncrementalContainerReportProto incrementalContainerReport = 4;
   repeated CommandStatusReportsProto commandStatusReports = 5;
   optional ContainerActionsProto containerActions = 6;
   optional PipelineActionsProto pipelineActions = 7;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index c0de382..53dfc5a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -58,10 +58,6 @@ public class CommandStatusReportHandler implements
         publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
             ReplicationStatus(cmdStatus));
         break;
-      case closeContainerCommand:
-        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
-            CloseContainerStatus(cmdStatus));
-        break;
       case deleteBlocksCommand:
         if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
           publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 69574a9..9796a96 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -22,14 +22,12 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
-import static 
org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ;
 
 /**
  * In case of a node failure, volume failure, volume out of spapce, node
@@ -128,32 +126,7 @@ public class CloseContainerEventHandler implements 
EventHandler<ContainerID> {
             new CommandForDatanode<>(node.getUuid(), closeContainerCommand))
         .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command));
 
-    publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(containerID));
-
     LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
         pipeline, containerID);
   }
-
-  /**
-   * Class to create retryable event. Prevents redundant requests for same
-   * container Id.
-   */
-  public static class CloseContainerRetryableReq implements
-      IdentifiableEventPayload {
-
-    private ContainerID containerID;
-    public CloseContainerRetryableReq(ContainerID containerID) {
-      this.containerID = containerID;
-    }
-
-    public ContainerID getContainerID() {
-      return containerID;
-    }
-
-    @Override
-    public long getId() {
-      return containerID.getId();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
deleted file mode 100644
index 4593c1f..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .CloseContainerStatus;
-
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
-    .CloseContainerRetryableReq;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from
- * CommandStatusReport. If required it will re-trigger CloseContainer command
- * for DataNodes to CloseContainerEventHandler.
- */
-public class CloseContainerWatcher extends
-    EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerWatcher.class);
-  private final ContainerManager containerManager;
-
-  public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
-      Event<CloseContainerStatus> completionEvent,
-      LeaseManager<Long> leaseManager, ContainerManager containerManager) {
-    super(startEvent, completionEvent, leaseManager);
-    this.containerManager = containerManager;
-  }
-
-  @Override
-  protected void onTimeout(EventPublisher publisher,
-      CloseContainerRetryableReq payload) {
-    // Let CloseContainerEventHandler handle this message.
-    this.resendEventToHandler(payload.getId(), publisher);
-  }
-
-  @Override
-  protected void onFinished(EventPublisher publisher,
-      CloseContainerRetryableReq payload) {
-    LOG.trace("CloseContainerCommand for containerId: {} executed ", payload
-        .getContainerID().getId());
-  }
-
-  @Override
-  protected synchronized void handleCompletion(CloseContainerStatus status,
-      EventPublisher publisher) throws LeaseNotFoundException {
-    // If status is PENDING then return without doing anything.
-    if(status.getCmdStatus().getStatus().equals(Status.PENDING)){
-      return;
-    }
-
-    CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId());
-    super.handleCompletion(status, publisher);
-    // If status is FAILED then send a msg to Handler to resend the command.
-    if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont
-        != null) {
-      this.resendEventToHandler(closeCont.getId(), publisher);
-    }
-  }
-
-  private void resendEventToHandler(long containerID, EventPublisher
-      publisher) {
-    try {
-      // Check if container is still open
-      if (containerManager.getContainer(
-          ContainerID.valueof(containerID)).isOpen()) {
-        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER,
-            ContainerID.valueof(containerID));
-      }
-    } catch (IOException e) {
-      LOG.warn("Error in CloseContainerWatcher while processing event " +
-          "for containerId {} ExceptionMsg: ", containerID, e.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 0a48915..0906ca8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -16,14 +16,11 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 
 import java.io.Closeable;
@@ -123,14 +120,6 @@ public interface ContainerManager extends Closeable {
       HddsProtos.LifeCycleEvent event) throws IOException;
 
   /**
-   * Process container report from Datanode.
-   *
-   * @param reports Container report
-   */
-  void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports) throws IOException;
-
-  /**
    * Returns the latest list of replicas for given containerId.
    *
    * @param containerID Container ID

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index f2e80f4..9445fe8 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -22,6 +22,8 @@ import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 
 import java.util.Optional;
 import java.util.UUID;
@@ -32,15 +34,18 @@ import java.util.UUID;
 public final class ContainerReplica implements Comparable<ContainerReplica> {
 
   final private ContainerID containerID;
+  final private ContainerReplicaProto.State state;
   final private DatanodeDetails datanodeDetails;
   final private UUID placeOfBirth;
 
   private Long sequenceId;
 
 
-  private ContainerReplica(ContainerID containerID, DatanodeDetails datanode,
-      UUID originNodeId) {
+  private ContainerReplica(final ContainerID containerID,
+      final ContainerReplicaProto.State state, final DatanodeDetails datanode,
+      final UUID originNodeId) {
     this.containerID = containerID;
+    this.state = state;
     this.datanodeDetails = datanode;
     this.placeOfBirth = originNodeId;
   }
@@ -68,6 +73,15 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
   }
 
   /**
+   * Returns the state of this replica.
+   *
+   * @return replica state
+   */
+  public ContainerReplicaProto.State getState() {
+    return state;
+  }
+
+  /**
    * Returns the Sequence Id of this replica.
    *
    * @return Sequence Id
@@ -126,6 +140,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
   public static class ContainerReplicaBuilder {
 
     private ContainerID containerID;
+    private ContainerReplicaProto.State state;
     private DatanodeDetails datanode;
     private UUID placeOfBirth;
     private Long sequenceId;
@@ -142,6 +157,12 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
       return this;
     }
 
+    public ContainerReplicaBuilder setContainerState(
+        final ContainerReplicaProto.State  containerState) {
+      state = containerState;
+      return this;
+    }
+
     /**
      * Set DatanodeDetails.
      *
@@ -184,9 +205,12 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
     public ContainerReplica build() {
       Preconditions.checkNotNull(containerID,
           "Container Id can't be null");
+      Preconditions.checkNotNull(state,
+          "Container state can't be null");
       Preconditions.checkNotNull(datanode,
           "DatanodeDetails can't be null");
-      ContainerReplica replica = new ContainerReplica(containerID, datanode,
+      ContainerReplica replica = new ContainerReplica(
+          containerID, state, datanode,
           Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid()));
       Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId);
       return replica;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 0cb2f81..c9fe9e4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -17,19 +17,26 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
-import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -47,73 +54,160 @@ public class ContainerReportHandler implements
       LoggerFactory.getLogger(ContainerReportHandler.class);
 
   private final NodeManager nodeManager;
+  private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
-  private ReplicationActivityStatus replicationStatus;
+  private final ReplicationActivityStatus replicationStatus;
 
-  public ContainerReportHandler(ContainerManager containerManager,
-      NodeManager nodeManager,
-      ReplicationActivityStatus replicationActivityStatus) {
-    Preconditions.checkNotNull(containerManager);
+  public ContainerReportHandler(final NodeManager nodeManager,
+      final PipelineManager pipelineManager,
+      final ContainerManager containerManager,
+      final ReplicationActivityStatus replicationActivityStatus) {
     Preconditions.checkNotNull(nodeManager);
+    Preconditions.checkNotNull(pipelineManager);
+    Preconditions.checkNotNull(containerManager);
     Preconditions.checkNotNull(replicationActivityStatus);
     this.nodeManager = nodeManager;
+    this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
     this.replicationStatus = replicationActivityStatus;
   }
 
   @Override
-  public void onMessage(ContainerReportFromDatanode 
containerReportFromDatanode,
-      EventPublisher publisher) {
+  public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
+      final EventPublisher publisher) {
+
+    final DatanodeDetails datanodeDetails =
+        reportFromDatanode.getDatanodeDetails();
 
-    DatanodeDetails datanodeOrigin =
-        containerReportFromDatanode.getDatanodeDetails();
+    final ContainerReportsProto containerReport =
+        reportFromDatanode.getReport();
 
-    ContainerReportsProto containerReport =
-        containerReportFromDatanode.getReport();
     try {
 
-      //update state in container db and trigger close container events
-      containerManager
-          .processContainerReports(datanodeOrigin, containerReport);
-
-      Set<ContainerID> containerIds = containerReport.getReportsList().stream()
-          .map(StorageContainerDatanodeProtocolProtos
-              .ContainerReplicaProto::getContainerID)
-          .map(ContainerID::new)
-          .collect(Collectors.toSet());
-
-      ReportResult<ContainerID> reportResult = nodeManager
-          .processContainerReport(datanodeOrigin.getUuid(), containerIds);
-
-      //we have the report, so we can update the states for the next iteration.
-      nodeManager
-          .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
-
-      for (ContainerID containerID : reportResult.getMissingEntries()) {
-        final ContainerReplica replica = ContainerReplica.newBuilder()
-            .setContainerID(containerID)
-            .setDatanodeDetails(datanodeOrigin)
-            .build();
-        containerManager
-            .removeContainerReplica(containerID, replica);
-        checkReplicationState(containerID, publisher);
+      final List<ContainerReplicaProto> replicas = containerReport
+          .getReportsList();
+
+      // ContainerIDs which SCM expects this datanode to have.
+      final Set<ContainerID> expectedContainerIDs = nodeManager
+          .getContainers(datanodeDetails);
+
+      // ContainerIDs that this datanode actually has.
+      final Set<ContainerID> actualContainerIDs = replicas.parallelStream()
+          .map(ContainerReplicaProto::getContainerID)
+          .map(ContainerID::valueof).collect(Collectors.toSet());
+
+      // Container replicas which SCM is not aware of.
+      final  Set<ContainerID> newReplicas =
+          new HashSet<>(actualContainerIDs);
+      newReplicas.removeAll(expectedContainerIDs);
+
+      // Container replicas which are missing from datanode.
+      final Set<ContainerID> missingReplicas =
+          new HashSet<>(expectedContainerIDs);
+      missingReplicas.removeAll(actualContainerIDs);
+
+      processContainerReplicas(datanodeDetails, replicas, publisher);
+
+      // Remove missing replica from ContainerManager
+      for (ContainerID id : missingReplicas) {
+        try {
+          containerManager.getContainerReplicas(id)
+              .stream()
+              .filter(replica ->
+                  replica.getDatanodeDetails().equals(datanodeDetails))
+              .findFirst()
+              .ifPresent(replica -> {
+                try {
+                  containerManager.removeContainerReplica(id, replica);
+                } catch (ContainerNotFoundException |
+                    ContainerReplicaNotFoundException e) {
+                  // This should not happen, but even if it happens, not an
+                  // issue
+                }
+              });
+        } catch (ContainerNotFoundException e) {
+          LOG.warn("Cannot remove container replica, container {} not found",
+              id);
+        }
       }
 
-      for (ContainerID containerID : reportResult.getNewEntries()) {
-        final ContainerReplica replica = ContainerReplica.newBuilder()
-            .setContainerID(containerID)
-            .setDatanodeDetails(datanodeOrigin)
-            .build();
-        containerManager.updateContainerReplica(containerID, replica);
-        checkReplicationState(containerID, publisher);
-      }
+      // Update the latest set of containers for this datanode in NodeManager.
+      nodeManager.setContainers(datanodeDetails, actualContainerIDs);
+
+      // Replicate if needed.
+      newReplicas.forEach(id -> checkReplicationState(id, publisher));
+      missingReplicas.forEach(id -> checkReplicationState(id, publisher));
+
+    } catch (NodeNotFoundException ex) {
+      LOG.error("Received container report from unknown datanode {}",
+          datanodeDetails);
+    }
+
+  }
 
-    } catch (IOException e) {
-      //TODO: stop all the replication?
-      LOG.error("Error on processing container report from datanode {}",
-          datanodeOrigin, e);
+  private void processContainerReplicas(final DatanodeDetails datanodeDetails,
+      final List<ContainerReplicaProto> replicas,
+      final EventPublisher publisher) {
+    final PendingDeleteStatusList pendingDeleteStatusList =
+        new PendingDeleteStatusList(datanodeDetails);
+    for (ContainerReplicaProto replicaProto : replicas) {
+      try {
+        final ContainerID containerID = ContainerID.valueof(
+            replicaProto.getContainerID());
+        final ContainerInfo containerInfo = containerManager
+            .getContainer(containerID);
+        updateContainerState(datanodeDetails, containerInfo,
+            replicaProto, publisher);
+        if (containerInfo.getDeleteTransactionId() >
+            replicaProto.getDeleteTransactionId()) {
+          pendingDeleteStatusList
+              .addPendingDeleteStatus(replicaProto.getDeleteTransactionId(),
+                  containerInfo.getDeleteTransactionId(),
+                  containerInfo.getContainerID());
+        }
+      } catch (ContainerNotFoundException e) {
+        LOG.error("Received container report for an unknown container {}",
+            replicaProto.getContainerID());
+      }
     }
+    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+      publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+          pendingDeleteStatusList);
+    }
+  }
 
+  private void updateContainerState(final DatanodeDetails datanodeDetails,
+      final ContainerInfo containerInfo,
+      final ContainerReplicaProto replicaProto,
+      final EventPublisher publisher)
+      throws ContainerNotFoundException {
+
+    final ContainerID id = containerInfo.containerID();
+    final ContainerReplica datanodeContainerReplica = ContainerReplica
+        .newBuilder()
+        .setContainerID(id)
+        .setContainerState(replicaProto.getState())
+        .setDatanodeDetails(datanodeDetails)
+        .build();
+    // TODO: Add bcsid and origin datanode to replica.
+
+    final ContainerReplica scmContainerReplica = containerManager
+        .getContainerReplicas(id)
+        .stream()
+        .filter(replica ->
+            replica.getDatanodeDetails().equals(datanodeDetails))
+        .findFirst().orElse(null);
+
+    // This is an in-memory update.
+    containerManager.updateContainerReplica(id, datanodeContainerReplica);
+    containerInfo.setUsedBytes(replicaProto.getUsed());
+    containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
+
+    // Check if there is state change in container replica.
+    if (scmContainerReplica == null ||
+        scmContainerReplica.getState() != datanodeContainerReplica.getState()) 
{
+      //TODO: Handler replica state change.
+    }
   }
 
   private void checkReplicationState(ContainerID containerID,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
new file mode 100644
index 0000000..e07ee73
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handles incremental container reports from datanode.
+ */
+public class IncrementalContainerReportHandler implements
+    EventHandler<IncrementalContainerReportFromDatanode> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IncrementalContainerReportHandler.class);
+
+  private final PipelineManager pipelineManager;
+  private final ContainerManager containerManager;
+
+  public IncrementalContainerReportHandler(
+      final PipelineManager pipelineManager,
+      final ContainerManager containerManager)  {
+    Preconditions.checkNotNull(pipelineManager);
+    Preconditions.checkNotNull(containerManager);
+    this.pipelineManager = pipelineManager;
+    this.containerManager = containerManager;
+  }
+
+  @Override
+  public void onMessage(
+      final IncrementalContainerReportFromDatanode containerReportFromDatanode,
+      final EventPublisher publisher) {
+
+    for (ContainerReplicaProto replicaProto :
+         containerReportFromDatanode.getReport().getReportList()) {
+      try {
+        final DatanodeDetails datanodeDetails = containerReportFromDatanode
+            .getDatanodeDetails();
+        final ContainerID containerID = ContainerID
+            .valueof(replicaProto.getContainerID());
+        final ContainerInfo containerInfo = containerManager
+            .getContainer(containerID);
+
+        ContainerReplica replica = ContainerReplica.newBuilder()
+            .setContainerID(ContainerID.valueof(replicaProto.getContainerID()))
+            .setContainerState(replicaProto.getState())
+            .setDatanodeDetails(datanodeDetails)
+            .build();
+
+        containerManager.updateContainerReplica(containerID, replica);
+
+        // Check if the state of the container is changed.
+        if (replicaProto.getState() == ContainerReplicaProto.State.CLOSED &&
+            containerInfo.getState() == HddsProtos.LifeCycleState.CLOSING) {
+          containerManager.updateContainerState(containerID,
+              HddsProtos.LifeCycleEvent.CLOSE);
+        }
+
+        // TODO: Handler replica state change
+
+      } catch (ContainerNotFoundException e) {
+        LOG.warn("Container {} not found!", replicaProto.getContainerID());
+      } catch (IOException e) {
+        LOG.error("Exception while processing ICR for container {}",
+            replicaProto.getContainerID());
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 0ea749f..86f1f9c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -23,13 +23,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -37,10 +34,6 @@ import 
org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -489,106 +482,6 @@ public class SCMContainerManager implements 
ContainerManager {
   }
 
   /**
-   * Process container report from Datanode.
-   * <p>
-   * Processing follows a very simple logic for time being.
-   * <p>
-   * 1. Datanodes report the current State -- denoted by the datanodeState
-   * <p>
-   * 2. We are the older SCM state from the Database -- denoted by
-   * the knownState.
-   * <p>
-   * 3. We copy the usage etc. from currentState to newState and log that
-   * newState to the DB. This allows us SCM to bootup again and read the
-   * state of the world from the DB, and then reconcile the state from
-   * container reports, when they arrive.
-   *
-   * @param reports Container report
-   */
-  @Override
-  public void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports) throws IOException {
-    List<ContainerReplicaProto>
-        containerInfos = reports.getReportsList();
-    PendingDeleteStatusList pendingDeleteStatusList =
-        new PendingDeleteStatusList(datanodeDetails);
-    for (ContainerReplicaProto newInfo :
-        containerInfos) {
-      ContainerID id = ContainerID.valueof(newInfo.getContainerID());
-      ContainerReplica replica = ContainerReplica.newBuilder()
-          .setContainerID(id)
-          .setDatanodeDetails(datanodeDetails)
-          .setOriginNodeId(datanodeDetails.getUuid())
-          .build();
-      lock.lock();
-      try {
-        containerStateManager.updateContainerReplica(id, replica);
-        ContainerInfo currentInfo = containerStateManager.getContainer(id);
-        if (newInfo.getState() == ContainerReplicaProto.State.CLOSED
-            && currentInfo.getState() == LifeCycleState.CLOSING) {
-          currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE);
-          if (!currentInfo.isOpen()) {
-            pipelineManager.removeContainerFromPipeline(
-                currentInfo.getPipelineID(), id);
-          }
-        }
-
-        ContainerInfoProto newState =
-            reconcileState(newInfo, currentInfo);
-
-        if (currentInfo.getDeleteTransactionId() >
-            newInfo.getDeleteTransactionId()) {
-          pendingDeleteStatusList
-                .addPendingDeleteStatus(newInfo.getDeleteTransactionId(),
-                    currentInfo.getDeleteTransactionId(),
-                    currentInfo.getContainerID());
-        }
-        containerStateManager.updateContainerInfo(
-            ContainerInfo.fromProtobuf(newState));
-        containerStore.put(id.getBytes(), newState.toByteArray());
-      } catch (ContainerNotFoundException e) {
-        LOG.error("Error while processing container report from datanode :" +
-                " {}, for container: {}, reason: container doesn't exist in" +
-                "container database.", datanodeDetails, id);
-      } finally {
-        lock.unlock();
-      }
-    }
-    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
-      eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
-          pendingDeleteStatusList);
-    }
-
-  }
-
-  /**
-   * Reconciles the state from Datanode with the state in SCM.
-   *
-   * @param datanodeState - State from the Datanode.
-   * @param knownState - State inside SCM.
-   * @return new SCM State for this container.
-   */
-  private HddsProtos.ContainerInfoProto reconcileState(
-      ContainerReplicaProto datanodeState,
-      ContainerInfo knownState) {
-    HddsProtos.ContainerInfoProto.Builder builder =
-        HddsProtos.ContainerInfoProto.newBuilder();
-    builder.setContainerID(knownState.getContainerID())
-        .setPipelineID(knownState.getPipelineID().getProtobuf())
-        .setState(knownState.getState())
-        .setReplicationType(knownState.getReplicationType())
-        .setReplicationFactor(knownState.getReplicationFactor())
-        .setUsedBytes(datanodeState.getUsed())
-        .setNumberOfKeys(datanodeState.getKeyCount())
-        .setStateEnterTime(knownState.getStateEnterTime())
-        .setDeleteTransactionId(knownState.getDeleteTransactionId());
-    if (knownState.getOwner() != null) {
-      builder.setOwner(knownState.getOwner());
-    }
-    return builder.build();
-  }
-
-  /**
    * Returns the latest list of DataNodes where replica for given containerId
    * exist. Throws an SCMException if no entry is found for given containerId.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index b8052a4..0c738b2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdds.scm.container.states;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -263,12 +262,6 @@ public class ContainerStateMap {
     }
   }
 
-  @VisibleForTesting
-  // TODO: fix the test case and remove this method!
-  public static Logger getLOG() {
-    return LOG;
-  }
-
   /**
    * Just update the container State.
    * @param info ContainerInfo.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 30a7c34..72d416b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,15 +21,13 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .CloseContainerStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
-        .CloseContainerRetryableReq;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
         .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .PipelineActionsFromDatanode;
@@ -79,6 +77,16 @@ public final class SCMEvents {
       new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
 
   /**
+   * IncrementalContainerReports are send out by Datanodes.
+   * This report is received by SCMDatanodeHeartbeatDispatcher and
+   * Incremental_Container_Report Event is generated.
+   */
+  public static final TypedEvent<IncrementalContainerReportFromDatanode>
+      INCREMENTAL_CONTAINER_REPORT = new TypedEvent<>(
+      IncrementalContainerReportFromDatanode.class,
+      "Incremental_Container_Report");
+
+  /**
    * ContainerActions are sent by Datanode. This event is received by
    * SCMDatanodeHeartbeatDispatcher and CONTAINER_ACTIONS event is generated.
    */
@@ -138,16 +146,6 @@ public final class SCMEvents {
       new TypedEvent<>(ContainerID.class, "Close_Container");
 
   /**
-   * A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by
-   * CloseContainerEventHandler after sending a SCMCommand to DataNode.
-   * CloseContainerWatcher will track this event. Watcher will be responsible
-   * for retrying it in event of failure or timeout.
-   */
-  public static final TypedEvent<CloseContainerRetryableReq>
-      CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>(
-      CloseContainerRetryableReq.class, "Close_Container_Retryable");
-
-  /**
    * This event will be triggered whenever a new datanode is registered with
    * SCM.
    */
@@ -176,14 +174,6 @@ public final class SCMEvents {
       TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status");
   /**
    * This event will be triggered by CommandStatusReportHandler whenever a
-   * status for CloseContainer SCMCommand is received.
-   */
-  public static final Event<CloseContainerStatus>
-      CLOSE_CONTAINER_STATUS =
-      new TypedEvent<>(CloseContainerStatus.class,
-          "Close_Container_Command_Status");
-  /**
-   * This event will be triggered by CommandStatusReportHandler whenever a
    * status for DeleteBlock SCMCommand is received.
    */
   public static final TypedEvent<CommandStatusReportHandler.DeleteBlockStatus>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 1030428..43f0167 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -61,7 +62,14 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
     // TODO: check if there are any pipeline on this node and fire close
     // pipeline event
     Set<ContainerID> ids =
-        nodeManager.getContainers(datanodeDetails.getUuid());
+        null;
+    try {
+      ids = nodeManager.getContainers(datanodeDetails);
+    } catch (NodeNotFoundException e) {
+      // This should not happen, we cannot get a dead node event for an
+      // unregistered node!
+      LOG.error("DeadNode event for a unregistered node: {}!", 
datanodeDetails);
+    }
     if (ids == null) {
       LOG.info("There's no containers in dead datanode {}, no replica will be"
           + " removed from the in-memory state.", datanodeDetails.getUuid());
@@ -72,18 +80,23 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
     for (ContainerID id : ids) {
       try {
         final ContainerInfo container = containerManager.getContainer(id);
+        // TODO: For open containers, trigger close on other nodes
+        // TODO: Check replica count and call replication manager
+        // on these containers.
         if (!container.isOpen()) {
-          final ContainerReplica replica = ContainerReplica.newBuilder()
-              .setContainerID(id)
-              .setDatanodeDetails(datanodeDetails)
-              .build();
-          try {
-            containerManager.removeContainerReplica(id, replica);
-            replicateIfNeeded(container, publisher);
-          } catch (ContainerException ex) {
-            LOG.warn("Exception while removing container replica #{} for " +
-                "container #{}.", replica, container, ex);
-          }
+          Set<ContainerReplica> replicas = containerManager
+              .getContainerReplicas(id);
+          replicas.stream()
+              .filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
+              .findFirst()
+              .ifPresent(replica -> {
+                try {
+                  containerManager.removeContainerReplica(id, replica);
+                } catch (ContainerException ex) {
+                  LOG.warn("Exception while removing container replica #{} " +
+                      "for container #{}.", replica, container, ex);
+                }
+              });
         }
       } catch (ContainerNotFoundException cnfe) {
         LOG.warn("Container Not found!", cnfe);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index 780aa2b..ed4fdba 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -19,31 +19,17 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
-import java.util.Collections;
-
 /**
  * Handles New Node event.
  */
 public class NewNodeHandler implements EventHandler<DatanodeDetails> {
 
-  private final NodeManager nodeManager;
-
-  public NewNodeHandler(NodeManager nodeManager) {
-    this.nodeManager = nodeManager;
-  }
-
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    try {
-      nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(),
-          Collections.emptySet());
-    } catch (SCMException e) {
-      // TODO: log exception message.
-    }
+    // We currently have nothing to do when we receive new node event.
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index d55ff98..e944634 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -63,13 +62,6 @@ import java.util.UUID;
  */
 public interface NodeManager extends StorageContainerNodeProtocol,
     EventHandler<CommandForDatanode>, NodeManagerMXBean, Closeable {
-  /**
-   * Removes a data node from the management of this Node Manager.
-   *
-   * @param node - DataNode.
-   * @throws NodeNotFoundException
-   */
-  void removeNode(DatanodeDetails node) throws NodeNotFoundException;
 
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
@@ -102,6 +94,7 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
    * Return a map of node stats.
    * @return a map of individual node stats (live/stale but not dead).
    */
+  // TODO: try to change the return type to Map<DatanodeDetails, SCMNodeStat>
   Map<UUID, SCMNodeStat> getNodeStats();
 
   /**
@@ -121,10 +114,10 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
 
   /**
    * Get set of pipelines a datanode is part of.
-   * @param dnId - datanodeID
+   * @param datanodeDetails DatanodeDetails
    * @return Set of PipelineID
    */
-  Set<PipelineID> getPipelineByDnID(UUID dnId);
+  Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails);
 
   /**
    * Add pipeline information in the NodeManager.
@@ -139,40 +132,22 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
   void removePipeline(Pipeline pipeline);
 
   /**
-   * Update set of containers available on a datanode.
-   * @param uuid - DatanodeID
+   * Remaps datanode to containers mapping to the new set of containers.
+   * @param datanodeDetails - DatanodeDetails
    * @param containerIds - Set of containerIDs
    * @throws SCMException - if datanode is not known. For new datanode use
    *                        addDatanodeInContainerMap call.
    */
-  void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
-      throws SCMException;
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds);
+  void setContainers(DatanodeDetails datanodeDetails,
+      Set<ContainerID> containerIds) throws NodeNotFoundException;
 
   /**
    * Return set of containerIDs available on a datanode.
-   * @param uuid - DatanodeID
-   * @return - set of containerIDs
-   */
-  Set<ContainerID> getContainers(UUID uuid);
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
+   * @param datanodeDetails DatanodeDetails
+   * @return set of containerIDs
    */
-  void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs)
-      throws SCMException;
+  Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException;
 
   /**
    * Add a {@link SCMCommand} to the command queue, which are
@@ -188,7 +163,7 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
    * @param dnUuid
    * @param nodeReport
    */
-  void processNodeReport(UUID dnUuid, NodeReportProto nodeReport);
+  void processNodeReport(DatanodeDetails dnUuid, NodeReportProto nodeReport);
 
   /**
    * Process a dead node event in this Node Manager.
@@ -202,5 +177,6 @@ public interface NodeManager extends 
StorageContainerNodeProtocol,
    * @param dnID - Datanode uuid.
    * @return list of commands
    */
+  // TODO: We can give better name to this method!
   List<SCMCommand> getCommandQueue(UUID dnID);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java
index 331bfed..f419764 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java
@@ -50,6 +50,6 @@ public class NodeReportHandler implements 
EventHandler<NodeReportFromDatanode> {
         + "missing DatanodeDetails.");
     LOGGER.trace("Processing node report for dn: {}", dn);
     nodeManager
-        .processNodeReport(dn.getUuid(), nodeReportFromDatanode.getReport());
+        .processNodeReport(dn, nodeReportFromDatanode.getReport());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 588756c..a459519 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.*;
 import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 import org.apache.hadoop.hdds.server.events.Event;
@@ -94,11 +93,6 @@ public class NodeStateManager implements Runnable, Closeable 
{
    */
   private final Node2PipelineMap node2PipelineMap;
   /**
-   * Maintains the map from node to ContainerIDs for the containers
-   * available on the node.
-   */
-  private final Node2ContainerMap node2ContainerMap;
-  /**
    * Used for publishing node state change events.
    */
   private final EventPublisher eventPublisher;
@@ -131,7 +125,6 @@ public class NodeStateManager implements Runnable, 
Closeable {
   public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
     this.nodeStateMap = new NodeStateMap();
     this.node2PipelineMap = new Node2PipelineMap();
-    this.node2ContainerMap = new Node2ContainerMap();
     this.eventPublisher = eventPublisher;
     this.state2EventMap = new HashMap<>();
     initialiseState2EventMap();
@@ -431,18 +424,6 @@ public class NodeStateManager implements Runnable, 
Closeable {
   }
 
   /**
-   * Removes a node from NodeStateManager.
-   *
-   * @param datanodeDetails DatanodeDetails
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public void removeNode(DatanodeDetails datanodeDetails)
-      throws NodeNotFoundException {
-    nodeStateMap.removeNode(datanodeDetails.getUuid());
-  }
-
-  /**
    * Returns the current stats of the node.
    *
    * @param uuid node id
@@ -475,19 +456,6 @@ public class NodeStateManager implements Runnable, 
Closeable {
   }
 
   /**
-   * Remove the current stats of the specify node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat the stat removed from the node.
-   *
-   * @throws NodeNotFoundException if the node is not present.
-   */
-  public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
-    return nodeStateMap.removeNodeStat(uuid);
-  }
-
-  /**
    * Removes a pipeline from the node2PipelineMap.
    * @param pipeline - Pipeline to be removed
    */
@@ -498,23 +466,11 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID
    * @param containerIds - Set of containerIDs
-   * @throws SCMException - if datanode is not known. For new datanode use
-   *                        addDatanodeInContainerMap call.
-   */
-  public void setContainersForDatanode(UUID uuid, Set<ContainerID> 
containerIds)
-      throws SCMException {
-    node2ContainerMap.setContainersForDatanode(uuid, containerIds);
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
+   * @throws NodeNotFoundException - if datanode is not known.
    */
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
-    return node2ContainerMap.processReport(uuid, containerIds);
+  public void setContainers(UUID uuid, Set<ContainerID> containerIds)
+      throws NodeNotFoundException {
+    nodeStateMap.setContainers(uuid, containerIds);
   }
 
   /**
@@ -522,20 +478,9 @@ public class NodeStateManager implements Runnable, 
Closeable {
    * @param uuid - DatanodeID
    * @return - set of containerIDs
    */
-  public Set<ContainerID> getContainers(UUID uuid) {
-    return node2ContainerMap.getContainers(uuid);
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
-    node2ContainerMap.insertNewDatanode(uuid, containerIDs);
+  public Set<ContainerID> getContainers(UUID uuid)
+      throws NodeNotFoundException {
+    return nodeStateMap.getContainers(uuid);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 35c22f3..c42ef66 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -129,16 +128,6 @@ public class SCMNodeManager
     }
   }
 
-  /**
-   * Removes a data node from the management of this Node Manager.
-   *
-   * @param node - DataNode.
-   * @throws NodeNotFoundException
-   */
-  @Override
-  public void removeNode(DatanodeDetails node) throws NodeNotFoundException {
-    nodeStateManager.removeNode(node);
-  }
 
   /**
    * Gets all datanodes that are in a certain state. This function works by
@@ -270,10 +259,8 @@ public class SCMNodeManager
       datanodeDetails.setHostName(dnAddress.getHostName());
       datanodeDetails.setIpAddress(dnAddress.getHostAddress());
     }
-    UUID dnId = datanodeDetails.getUuid();
     try {
       nodeStateManager.addNode(datanodeDetails);
-      nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
       // Updating Node Report, as registration is successful
       updateNodeStat(datanodeDetails.getUuid(), nodeReport);
       LOG.info("Registered Data node : {}", datanodeDetails);
@@ -326,8 +313,9 @@ public class SCMNodeManager
    * @param nodeReport
    */
   @Override
-  public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
-    this.updateNodeStat(dnUuid, nodeReport);
+  public void processNodeReport(DatanodeDetails dnUuid,
+                                NodeReportProto nodeReport) {
+    this.updateNodeStat(dnUuid.getUuid(), nodeReport);
   }
 
   /**
@@ -377,12 +365,12 @@ public class SCMNodeManager
 
   /**
    * Get set of pipelines a datanode is part of.
-   * @param dnId - datanodeID
+   * @param datanodeDetails - datanodeID
    * @return Set of PipelineID
    */
   @Override
-  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
-    return nodeStateManager.getPipelineByDnID(dnId);
+  public Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails) {
+    return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid());
   }
 
 
@@ -406,50 +394,27 @@ public class SCMNodeManager
 
   /**
    * Update set of containers available on a datanode.
-   * @param uuid - DatanodeID
+   * @param datanodeDetails - DatanodeID
    * @param containerIds - Set of containerIDs
    * @throws SCMException - if datanode is not known. For new datanode use
    *                        addDatanodeInContainerMap call.
    */
   @Override
-  public void setContainersForDatanode(UUID uuid,
-      Set<ContainerID> containerIds) throws SCMException {
-    nodeStateManager.setContainersForDatanode(uuid, containerIds);
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  @Override
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
-    return nodeStateManager.processContainerReport(uuid, containerIds);
+  public void setContainers(DatanodeDetails datanodeDetails,
+      Set<ContainerID> containerIds) throws NodeNotFoundException {
+    nodeStateManager.setContainers(datanodeDetails.getUuid(),
+        containerIds);
   }
 
   /**
    * Return set of containerIDs available on a datanode.
-   * @param uuid - DatanodeID
+   * @param datanodeDetails - DatanodeID
    * @return - set of containerIDs
    */
   @Override
-  public Set<ContainerID> getContainers(UUID uuid) {
-    return nodeStateManager.getContainers(uuid);
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  @Override
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
-    nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs);
+  public Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException {
+    return nodeStateManager.getContainers(datanodeDetails.getUuid());
   }
 
   // TODO:
@@ -481,6 +446,7 @@ public class SCMNodeManager
    * @param dnUuid datanode uuid.
    */
   @Override
+  // TODO: This should be removed.
   public void processDeadNode(UUID dnUuid) {
     try {
       SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index 9df9dff..268fe5b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -49,7 +49,7 @@ public class StaleNodeHandler implements 
EventHandler<DatanodeDetails> {
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
     Set<PipelineID> pipelineIds =
-        nodeManager.getPipelineByDnID(datanodeDetails.getUuid());
+        nodeManager.getPipelines(datanodeDetails);
     for (PipelineID pipelineID : pipelineIds) {
       try {
         pipelineManager.finalizePipeline(pipelineID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
index 774ced1..a917e79 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.node.states;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 
@@ -48,6 +49,10 @@ public class NodeStateMap {
    * Represents the current stats of node.
    */
   private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
+  /**
+   * Node to set of containers on the node.
+   */
+  private final ConcurrentHashMap<UUID, Set<ContainerID>> nodeToContainer;
 
   private final ReadWriteLock lock;
 
@@ -59,6 +64,7 @@ public class NodeStateMap {
     nodeMap = new ConcurrentHashMap<>();
     stateMap = new ConcurrentHashMap<>();
     nodeStats = new ConcurrentHashMap<>();
+    nodeToContainer = new ConcurrentHashMap<>();
     initStateMap();
   }
 
@@ -88,6 +94,8 @@ public class NodeStateMap {
         throw new NodeAlreadyExistsException("Node UUID: " + id);
       }
       nodeMap.put(id, new DatanodeInfo(datanodeDetails));
+      nodeStats.put(id, new SCMNodeStat());
+      nodeToContainer.put(id, Collections.emptySet());
       stateMap.get(nodeState).add(id);
     } finally {
       lock.writeLock().unlock();
@@ -238,30 +246,6 @@ public class NodeStateMap {
   }
 
   /**
-   * Removes the node from NodeStateMap.
-   *
-   * @param uuid node id
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public void removeNode(UUID uuid) throws NodeNotFoundException {
-    lock.writeLock().lock();
-    try {
-      if (nodeMap.containsKey(uuid)) {
-        for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
-          if(entry.getValue().remove(uuid)) {
-            break;
-          }
-          nodeMap.remove(uuid);
-        }
-        throw new NodeNotFoundException("Node UUID: " + uuid);
-      }
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
    * Returns the current stats of the node.
    *
    * @param uuid node id
@@ -298,21 +282,30 @@ public class NodeStateMap {
     nodeStats.put(uuid, newstat);
   }
 
-  /**
-   * Remove the current stats of the specify node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat the stat removed from the node.
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
-    SCMNodeStat stat = nodeStats.remove(uuid);
-    if (stat == null) {
+  public void setContainers(UUID uuid, Set<ContainerID> containers)
+      throws NodeNotFoundException{
+    if (!nodeToContainer.containsKey(uuid)) {
       throw new NodeNotFoundException("Node UUID: " + uuid);
     }
-    return stat;
+    nodeToContainer.put(uuid, containers);
+  }
+
+  public Set<ContainerID> getContainers(UUID uuid)
+      throws NodeNotFoundException {
+    Set<ContainerID> containers = nodeToContainer.get(uuid);
+    if (containers == null) {
+      throw new NodeNotFoundException("Node UUID: " + uuid);
+    }
+    return Collections.unmodifiableSet(containers);
+  }
+
+  public void removeContainer(UUID uuid, ContainerID containerID) throws
+      NodeNotFoundException {
+    Set<ContainerID> containers = nodeToContainer.get(uuid);
+    if (containers == null) {
+      throw new NodeNotFoundException("Node UUID: " + uuid);
+    }
+    containers.remove(containerID);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 7c7df27..ede8b4f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hdds.scm.server;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -210,6 +212,19 @@ public final class SCMDatanodeHeartbeatDispatcher {
   }
 
   /**
+   * Incremental Container report event payload with origin.
+   */
+  public static class IncrementalContainerReportFromDatanode
+      extends ReportFromDatanode<IncrementalContainerReportProto> {
+
+    public IncrementalContainerReportFromDatanode(
+        DatanodeDetails datanodeDetails,
+        IncrementalContainerReportProto report) {
+      super(datanodeDetails, report);
+    }
+  }
+
+  /**
    * Container action event payload with origin.
    */
   public static class ContainerActionsFromDatanode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c80f753b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 37c7386..0beceab 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -97,6 +97,7 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRES
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
 
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -196,8 +197,9 @@ public class SCMDatanodeProtocolServer implements
         .register(datanodeDetails, nodeReport, pipelineReportsProto);
     if (registeredCommand.getError()
         == SCMRegisteredResponseProto.ErrorCode.success) {
-      scm.getContainerManager().processContainerReports(datanodeDetails,
-          containerReportsProto);
+      eventPublisher.fireEvent(CONTAINER_REPORT,
+          new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(
+              datanodeDetails, containerReportsProto));
       eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
           new NodeRegistrationContainerReport(datanodeDetails,
               containerReportsProto));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to