This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c07ccd7  HDDS-4029. Recon unable to add a new container which is in 
CLOSED state. (#1258)
c07ccd7 is described below

commit c07ccd7c23dd63745124f8b509fe4d90c4d5c966
Author: avijayanhwx <[email protected]>
AuthorDate: Mon Aug 3 09:20:28 2020 -0700

    HDDS-4029. Recon unable to add a new container which is in CLOSED state. 
(#1258)
---
 .../hdds/scm/container/ContainerStateManager.java  |  8 +-
 .../ozone/recon/scm/ReconContainerManager.java     | 61 +++++++++++---
 .../recon/scm/ReconContainerReportHandler.java     |  2 +-
 .../ReconIncrementalContainerReportHandler.java    |  2 +-
 .../scm/AbstractReconContainerManagerTest.java     | 45 ++++++++++
 .../ozone/recon/scm/TestReconContainerManager.java | 95 ++++++++++++++++------
 ...TestReconIncrementalContainerReportHandler.java | 63 ++++++++++++++
 7 files changed, 236 insertions(+), 40 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 5a22521..e575c60 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -325,8 +325,12 @@ public class ContainerStateManager {
                                Pipeline pipeline) throws IOException {
     Preconditions.checkNotNull(containerInfo);
     containers.addContainer(containerInfo);
-    pipelineManager.addContainerToPipeline(pipeline.getId(),
-        ContainerID.valueof(containerID));
+    if (pipeline != null) {
+      // In Recon, while adding a 'new' CLOSED container, pipeline will be a
+      // random ID, and hence be passed down as null.
+      pipelineManager.addContainerToPipeline(pipeline.getId(),
+          ContainerID.valueof(containerID));
+    }
     containerStateCount.incrementAndGet(containerInfo.getState());
   }
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 72d1548..dff4709 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -18,16 +18,21 @@
 
 package org.apache.hadoop.ozone.recon.scm;
 
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE;
+
 import java.io.IOException;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -77,6 +82,7 @@ public class ReconContainerManager extends 
SCMContainerManager {
    * @throws IOException on Error.
    */
   public void checkAndAddNewContainer(ContainerID containerID,
+      ContainerReplicaProto.State replicaState,
       DatanodeDetails datanodeDetails)
       throws IOException {
     if (!exists(containerID)) {
@@ -84,15 +90,34 @@ public class ReconContainerManager extends 
SCMContainerManager {
           datanodeDetails.getHostName());
       ContainerWithPipeline containerWithPipeline =
           scmClient.getContainerWithPipeline(containerID.getId());
-      LOG.debug("Verified new container from SCM {} ",
-          containerWithPipeline.getContainerInfo().containerID());
+      LOG.debug("Verified new container from SCM {}, {} ",
+          containerID, containerWithPipeline.getPipeline().getId());
       // If no other client added this, go ahead and add this container.
       if (!exists(containerID)) {
         addNewContainer(containerID.getId(), containerWithPipeline);
       }
+    } else {
+      // Check if container state is not open. In SCM, container state
+      // changes to CLOSING first, and then the close command is pushed down
+      // to Datanodes. Recon 'learns' this from DN, and hence replica state
+      // will move container state to 'CLOSING'.
+      ContainerInfo containerInfo = getContainer(containerID);
+      if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)
+          && !replicaState.equals(ContainerReplicaProto.State.OPEN)
+          && isHealthy(replicaState)) {
+        LOG.info("Container {} has state OPEN, but Replica has State {}.",
+            containerID, replicaState);
+        updateContainerState(containerID, FINALIZE);
+      }
     }
   }
 
+  private boolean isHealthy(ContainerReplicaProto.State replicaState) {
+    return replicaState != ContainerReplicaProto.State.UNHEALTHY
+        && replicaState != ContainerReplicaProto.State.INVALID
+        && replicaState != ContainerReplicaProto.State.DELETED;
+  }
+
   /**
    * Adds a new container to Recon's container manager.
    * @param containerId id
@@ -105,18 +130,32 @@ public class ReconContainerManager extends 
SCMContainerManager {
     ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
     getLock().lock();
     try {
-      if (getPipelineManager().containsPipeline(
-          containerWithPipeline.getPipeline().getId())) {
-        getContainerStateManager().addContainerInfo(containerId, containerInfo,
-            getPipelineManager(), containerWithPipeline.getPipeline());
+      boolean success = false;
+      if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
+        PipelineID pipelineID = containerWithPipeline.getPipeline().getId();
+        if (getPipelineManager().containsPipeline(pipelineID)) {
+          getContainerStateManager().addContainerInfo(containerId,
+              containerInfo, getPipelineManager(),
+              containerWithPipeline.getPipeline());
+          success = true;
+        } else {
+          // Get open container for a pipeline that Recon does not know
+          // about yet. Cannot update internal state until pipeline is synced.
+          LOG.warn(String.format(
+              "Pipeline %s not found. Cannot add container %s",
+              pipelineID, containerInfo.containerID()));
+        }
+      } else {
+        // Non 'Open' Container. No need to worry about pipeline since SCM
+        // returns a random pipelineID.
+        getContainerStateManager().addContainerInfo(containerId,
+            containerInfo, getPipelineManager(), null);
+        success = true;
+      }
+      if (success) {
         addContainerToDB(containerInfo);
         LOG.info("Successfully added container {} to Recon.",
             containerInfo.containerID());
-      } else {
-        throw new IOException(
-            String.format("Pipeline %s not found. Cannot add container %s",
-                containerWithPipeline.getPipeline().getId(),
-                containerInfo.containerID()));
       }
     } catch (IOException ex) {
       LOG.info("Exception while adding container {} .",
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
index 0bf63a2..228a657 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
@@ -60,10 +60,10 @@ public class ReconContainerReportHandler extends 
ContainerReportHandler {
           containerReplicaProto.getContainerID());
       try {
         containerManager.checkAndAddNewContainer(id,
+            containerReplicaProto.getState(),
             reportFromDatanode.getDatanodeDetails());
       } catch (IOException ioEx) {
         LOG.error("Exception while checking and adding new container.", ioEx);
-        return;
       }
       LOG.debug("Got container report for containerID {} ",
           containerReplicaProto.getContainerID());
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
index b538caf..9e88872 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
@@ -73,7 +73,7 @@ public class ReconIncrementalContainerReportHandler
         final ContainerID id = ContainerID.valueof(
             replicaProto.getContainerID());
         try {
-          containerManager.checkAndAddNewContainer(id,
+          containerManager.checkAndAddNewContainer(id, replicaProto.getState(),
               report.getDatanodeDetails());
         } catch (IOException ioEx) {
           LOG.error("Exception while checking and adding new container.", 
ioEx);
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index 3114c02..783f42c 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.recon.scm;
 import java.io.IOException;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 
@@ -40,6 +42,7 @@ import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OP
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
 import org.junit.After;
@@ -129,4 +132,46 @@ public class AbstractReconContainerManagerTest {
         .thenReturn(containerWithPipeline);
     return scmServiceProviderMock;
   }
+
+  protected Table<ContainerID, ContainerInfo> getContainerTable()
+      throws IOException {
+    return CONTAINERS.getTable(store);
+  }
+
+  protected ContainerWithPipeline getTestContainer(LifeCycleState state)
+      throws IOException {
+    ContainerID containerID = new ContainerID(100L);
+    Pipeline pipeline = getRandomPipeline();
+    pipelineManager.addPipeline(pipeline);
+    ContainerInfo containerInfo =
+        new ContainerInfo.Builder()
+            .setContainerID(containerID.getId())
+            .setNumberOfKeys(10)
+            .setPipelineID(pipeline.getId())
+            .setReplicationFactor(ONE)
+            .setOwner("test")
+            .setState(state)
+            .setReplicationType(STAND_ALONE)
+            .build();
+    return new ContainerWithPipeline(containerInfo, pipeline);
+  }
+
+  protected ContainerWithPipeline getTestContainer(long id,
+                                                   LifeCycleState state)
+      throws IOException {
+    ContainerID containerID = new ContainerID(id);
+    Pipeline pipeline = getRandomPipeline();
+    pipelineManager.addPipeline(pipeline);
+    ContainerInfo containerInfo =
+        new ContainerInfo.Builder()
+            .setContainerID(containerID.getId())
+            .setNumberOfKeys(10)
+            .setPipelineID(pipeline.getId())
+            .setReplicationFactor(ONE)
+            .setOwner("test")
+            .setState(state)
+            .setReplicationType(STAND_ALONE)
+            .build();
+    return new ContainerWithPipeline(containerInfo, pipeline);
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
index ccc1c80..9f47779 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
@@ -19,10 +19,9 @@
 package org.apache.hadoop.ozone.recon.scm;
 
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
-import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -32,10 +31,11 @@ import java.util.List;
 import java.util.NavigableSet;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.junit.Test;
 
 /**
@@ -45,39 +45,57 @@ public class TestReconContainerManager
     extends AbstractReconContainerManagerTest {
 
   @Test
-  public void testAddNewContainer() throws IOException {
-    ContainerID containerID = new ContainerID(100L);
-    Pipeline pipeline = getRandomPipeline();
-    ReconPipelineManager pipelineManager = getPipelineManager();
-    pipelineManager.addPipeline(pipeline);
-    ContainerInfo containerInfo =
-        new ContainerInfo.Builder()
-            .setContainerID(containerID.getId())
-            .setNumberOfKeys(10)
-            .setPipelineID(pipeline.getId())
-            .setReplicationFactor(ONE)
-            .setOwner("test")
-            .setState(OPEN)
-            .setReplicationType(STAND_ALONE)
-            .build();
+  public void testAddNewOpenContainer() throws IOException {
     ContainerWithPipeline containerWithPipeline =
-        new ContainerWithPipeline(containerInfo, pipeline);
+        getTestContainer(LifeCycleState.OPEN);
+    ContainerID containerID =
+        containerWithPipeline.getContainerInfo().containerID();
+    ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
 
     ReconContainerManager containerManager = getContainerManager();
     assertFalse(containerManager.exists(containerID));
+    assertFalse(getContainerTable().isExist(containerID));
 
     containerManager.addNewContainer(
         containerID.getId(), containerWithPipeline);
 
     assertTrue(containerManager.exists(containerID));
 
-    List<ContainerInfo> containers = containerManager.getContainers(OPEN);
+    List<ContainerInfo> containers =
+        containerManager.getContainers(LifeCycleState.OPEN);
     assertEquals(1, containers.size());
     assertEquals(containerInfo, containers.get(0));
     NavigableSet<ContainerID> containersInPipeline =
-        pipelineManager.getContainersInPipeline(pipeline.getId());
+        getPipelineManager().getContainersInPipeline(
+            containerWithPipeline.getPipeline().getId());
     assertEquals(1, containersInPipeline.size());
     assertEquals(containerID, containersInPipeline.first());
+
+    // Verify container DB.
+    assertTrue(getContainerTable().isExist(containerID));
+  }
+
+  @Test
+  public void testAddNewClosedContainer() throws IOException {
+    ContainerWithPipeline containerWithPipeline = getTestContainer(CLOSED);
+    ContainerID containerID =
+        containerWithPipeline.getContainerInfo().containerID();
+    ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
+
+    ReconContainerManager containerManager = getContainerManager();
+    assertFalse(containerManager.exists(containerID));
+    assertFalse(getContainerTable().isExist(containerID));
+
+    containerManager.addNewContainer(
+        containerID.getId(), containerWithPipeline);
+
+    assertTrue(containerManager.exists(containerID));
+
+    List<ContainerInfo> containers = containerManager.getContainers(CLOSED);
+    assertEquals(1, containers.size());
+    assertEquals(containerInfo, containers.get(0));
+    // Verify container DB.
+    assertTrue(getContainerTable().isExist(containerID));
   }
 
   @Test
@@ -86,12 +104,39 @@ public class TestReconContainerManager
     ReconContainerManager containerManager = getContainerManager();
     assertFalse(containerManager.exists(containerID));
     DatanodeDetails datanodeDetails = randomDatanodeDetails();
-    containerManager.checkAndAddNewContainer(containerID, datanodeDetails);
+    containerManager.checkAndAddNewContainer(containerID,
+        OPEN, datanodeDetails);
     assertTrue(containerManager.exists(containerID));
 
     // Doing it one more time should not change any state.
-    containerManager.checkAndAddNewContainer(containerID, datanodeDetails);
+    containerManager.checkAndAddNewContainer(containerID, OPEN,
+        datanodeDetails);
     assertTrue(containerManager.exists(containerID));
+    assertEquals(LifeCycleState.OPEN,
+        getContainerManager().getContainer(containerID).getState());
   }
 
+  @Test
+  public void testUpdateContainerStateFromOpen() throws IOException {
+    ContainerWithPipeline containerWithPipeline =
+        getTestContainer(LifeCycleState.OPEN);
+
+    long id = containerWithPipeline.getContainerInfo().getContainerID();
+    ContainerID containerID =
+        containerWithPipeline.getContainerInfo().containerID();
+
+    // Adding container #100.
+    getContainerManager().addNewContainer(id, containerWithPipeline);
+    assertEquals(LifeCycleState.OPEN,
+        getContainerManager().getContainer(containerID).getState());
+
+    DatanodeDetails datanodeDetails = randomDatanodeDetails();
+
+    // First report with "CLOSED" replica state moves container state to
+    // "CLOSING".
+    getContainerManager().checkAndAddNewContainer(containerID, State.CLOSED,
+        datanodeDetails);
+    assertEquals(CLOSING,
+        getContainerManager().getContainer(containerID).getState());
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
index dacf293..1b42f21 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
@@ -19,23 +19,28 @@
 package org.apache.hadoop.ozone.recon.scm;
 
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.UUID;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -90,6 +95,64 @@ public class TestReconIncrementalContainerReportHandler
     nodeManager.addContainer(datanodeDetails, containerID);
     assertTrue(containerManager.exists(containerID));
     assertEquals(1, containerManager.getContainerReplicas(containerID).size());
+    assertEquals(OPEN, containerManager.getContainer(containerID).getState());
+  }
+
+  @Test
+  public void testProcessICRStateMismatch() throws IOException {
+
+    // Recon container state is "OPEN".
+    // Replica state could be any Non OPEN state.
+    long containerId = 11;
+    for (State state : Arrays.asList(State.CLOSING, State.QUASI_CLOSED,
+        State.CLOSED)) {
+      ContainerWithPipeline containerWithPipeline = getTestContainer(
+          containerId++, OPEN);
+      ContainerID containerID =
+          containerWithPipeline.getContainerInfo().containerID();
+
+      ReconContainerManager containerManager = getContainerManager();
+      containerManager.addNewContainer(containerID.getId(),
+          containerWithPipeline);
+
+      DatanodeDetails datanodeDetails =
+          containerWithPipeline.getPipeline().getFirstNode();
+      NodeManager nodeManagerMock = mock(NodeManager.class);
+      when(nodeManagerMock.getNodeByUuid(any())).thenReturn(datanodeDetails);
+      IncrementalContainerReportFromDatanode reportMock =
+          mock(IncrementalContainerReportFromDatanode.class);
+      when(reportMock.getDatanodeDetails())
+          .thenReturn(containerWithPipeline.getPipeline().getFirstNode());
+
+      IncrementalContainerReportProto containerReport =
+          getIncrementalContainerReportProto(containerID, state,
+              datanodeDetails.getUuidString());
+      when(reportMock.getReport()).thenReturn(containerReport);
+      ReconIncrementalContainerReportHandler reconIcr =
+          new ReconIncrementalContainerReportHandler(nodeManagerMock,
+              containerManager);
+
+      reconIcr.onMessage(reportMock, mock(EventPublisher.class));
+      assertTrue(containerManager.exists(containerID));
+      assertEquals(1,
+          containerManager.getContainerReplicas(containerID).size());
+      LifeCycleState expectedState = getContainerStateFromReplicaState(state);
+      LifeCycleState actualState =
+          containerManager.getContainer(containerID).getState();
+      assertEquals(String.format("Expecting %s in " +
+              "container state for replica state %s", expectedState,
+          state), expectedState, actualState);
+    }
+  }
+
+  private LifeCycleState getContainerStateFromReplicaState(
+      State state) {
+    switch (state) {
+    case CLOSING: return LifeCycleState.CLOSING;
+    case QUASI_CLOSED: return LifeCycleState.QUASI_CLOSED;
+    case CLOSED: return LifeCycleState.CLOSED;
+    default: return null;
+    }
   }
 
   private static IncrementalContainerReportProto


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to