This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c07ccd7 HDDS-4029. Recon unable to add a new container which is in
CLOSED state. (#1258)
c07ccd7 is described below
commit c07ccd7c23dd63745124f8b509fe4d90c4d5c966
Author: avijayanhwx <[email protected]>
AuthorDate: Mon Aug 3 09:20:28 2020 -0700
HDDS-4029. Recon unable to add a new container which is in CLOSED state.
(#1258)
---
.../hdds/scm/container/ContainerStateManager.java | 8 +-
.../ozone/recon/scm/ReconContainerManager.java | 61 +++++++++++---
.../recon/scm/ReconContainerReportHandler.java | 2 +-
.../ReconIncrementalContainerReportHandler.java | 2 +-
.../scm/AbstractReconContainerManagerTest.java | 45 ++++++++++
.../ozone/recon/scm/TestReconContainerManager.java | 95 ++++++++++++++++------
...TestReconIncrementalContainerReportHandler.java | 63 ++++++++++++++
7 files changed, 236 insertions(+), 40 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 5a22521..e575c60 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -325,8 +325,12 @@ public class ContainerStateManager {
Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
- pipelineManager.addContainerToPipeline(pipeline.getId(),
- ContainerID.valueof(containerID));
+ if (pipeline != null) {
+ // In Recon, while adding a 'new' CLOSED container, pipeline will be a
+ // random ID, and hence be passed down as null.
+ pipelineManager.addContainerToPipeline(pipeline.getId(),
+ ContainerID.valueof(containerID));
+ }
containerStateCount.incrementAndGet(containerInfo.getState());
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 72d1548..dff4709 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -18,16 +18,21 @@
package org.apache.hadoop.ozone.recon.scm;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE;
+
import java.io.IOException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -77,6 +82,7 @@ public class ReconContainerManager extends
SCMContainerManager {
* @throws IOException on Error.
*/
public void checkAndAddNewContainer(ContainerID containerID,
+ ContainerReplicaProto.State replicaState,
DatanodeDetails datanodeDetails)
throws IOException {
if (!exists(containerID)) {
@@ -84,15 +90,34 @@ public class ReconContainerManager extends
SCMContainerManager {
datanodeDetails.getHostName());
ContainerWithPipeline containerWithPipeline =
scmClient.getContainerWithPipeline(containerID.getId());
- LOG.debug("Verified new container from SCM {} ",
- containerWithPipeline.getContainerInfo().containerID());
+ LOG.debug("Verified new container from SCM {}, {} ",
+ containerID, containerWithPipeline.getPipeline().getId());
// If no other client added this, go ahead and add this container.
if (!exists(containerID)) {
addNewContainer(containerID.getId(), containerWithPipeline);
}
+ } else {
+ // Check if container state is not open. In SCM, container state
+ // changes to CLOSING first, and then the close command is pushed down
+ // to Datanodes. Recon 'learns' this from DN, and hence replica state
+ // will move container state to 'CLOSING'.
+ ContainerInfo containerInfo = getContainer(containerID);
+ if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)
+ && !replicaState.equals(ContainerReplicaProto.State.OPEN)
+ && isHealthy(replicaState)) {
+ LOG.info("Container {} has state OPEN, but Replica has State {}.",
+ containerID, replicaState);
+ updateContainerState(containerID, FINALIZE);
+ }
}
}
+ private boolean isHealthy(ContainerReplicaProto.State replicaState) {
+ return replicaState != ContainerReplicaProto.State.UNHEALTHY
+ && replicaState != ContainerReplicaProto.State.INVALID
+ && replicaState != ContainerReplicaProto.State.DELETED;
+ }
+
/**
* Adds a new container to Recon's container manager.
* @param containerId id
@@ -105,18 +130,32 @@ public class ReconContainerManager extends
SCMContainerManager {
ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
getLock().lock();
try {
- if (getPipelineManager().containsPipeline(
- containerWithPipeline.getPipeline().getId())) {
- getContainerStateManager().addContainerInfo(containerId, containerInfo,
- getPipelineManager(), containerWithPipeline.getPipeline());
+ boolean success = false;
+ if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
+ PipelineID pipelineID = containerWithPipeline.getPipeline().getId();
+ if (getPipelineManager().containsPipeline(pipelineID)) {
+ getContainerStateManager().addContainerInfo(containerId,
+ containerInfo, getPipelineManager(),
+ containerWithPipeline.getPipeline());
+ success = true;
+ } else {
+ // Get open container for a pipeline that Recon does not know
+ // about yet. Cannot update internal state until pipeline is synced.
+ LOG.warn(String.format(
+ "Pipeline %s not found. Cannot add container %s",
+ pipelineID, containerInfo.containerID()));
+ }
+ } else {
+ // Non 'Open' Container. No need to worry about pipeline since SCM
+ // returns a random pipelineID.
+ getContainerStateManager().addContainerInfo(containerId,
+ containerInfo, getPipelineManager(), null);
+ success = true;
+ }
+ if (success) {
addContainerToDB(containerInfo);
LOG.info("Successfully added container {} to Recon.",
containerInfo.containerID());
- } else {
- throw new IOException(
- String.format("Pipeline %s not found. Cannot add container %s",
- containerWithPipeline.getPipeline().getId(),
- containerInfo.containerID()));
}
} catch (IOException ex) {
LOG.info("Exception while adding container {} .",
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
index 0bf63a2..228a657 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportHandler.java
@@ -60,10 +60,10 @@ public class ReconContainerReportHandler extends
ContainerReportHandler {
containerReplicaProto.getContainerID());
try {
containerManager.checkAndAddNewContainer(id,
+ containerReplicaProto.getState(),
reportFromDatanode.getDatanodeDetails());
} catch (IOException ioEx) {
LOG.error("Exception while checking and adding new container.", ioEx);
- return;
}
LOG.debug("Got container report for containerID {} ",
containerReplicaProto.getContainerID());
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
index b538caf..9e88872 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
@@ -73,7 +73,7 @@ public class ReconIncrementalContainerReportHandler
final ContainerID id = ContainerID.valueof(
replicaProto.getContainerID());
try {
- containerManager.checkAndAddNewContainer(id,
+ containerManager.checkAndAddNewContainer(id, replicaProto.getState(),
report.getDatanodeDetails());
} catch (IOException ioEx) {
LOG.error("Exception while checking and adding new container.",
ioEx);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index 3114c02..783f42c 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.recon.scm;
import java.io.IOException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
@@ -40,6 +42,7 @@ import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OP
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
import org.junit.After;
@@ -129,4 +132,46 @@ public class AbstractReconContainerManagerTest {
.thenReturn(containerWithPipeline);
return scmServiceProviderMock;
}
+
+ protected Table<ContainerID, ContainerInfo> getContainerTable()
+ throws IOException {
+ return CONTAINERS.getTable(store);
+ }
+
+ protected ContainerWithPipeline getTestContainer(LifeCycleState state)
+ throws IOException {
+ ContainerID containerID = new ContainerID(100L);
+ Pipeline pipeline = getRandomPipeline();
+ pipelineManager.addPipeline(pipeline);
+ ContainerInfo containerInfo =
+ new ContainerInfo.Builder()
+ .setContainerID(containerID.getId())
+ .setNumberOfKeys(10)
+ .setPipelineID(pipeline.getId())
+ .setReplicationFactor(ONE)
+ .setOwner("test")
+ .setState(state)
+ .setReplicationType(STAND_ALONE)
+ .build();
+ return new ContainerWithPipeline(containerInfo, pipeline);
+ }
+
+ protected ContainerWithPipeline getTestContainer(long id,
+ LifeCycleState state)
+ throws IOException {
+ ContainerID containerID = new ContainerID(id);
+ Pipeline pipeline = getRandomPipeline();
+ pipelineManager.addPipeline(pipeline);
+ ContainerInfo containerInfo =
+ new ContainerInfo.Builder()
+ .setContainerID(containerID.getId())
+ .setNumberOfKeys(10)
+ .setPipelineID(pipeline.getId())
+ .setReplicationFactor(ONE)
+ .setOwner("test")
+ .setState(state)
+ .setReplicationType(STAND_ALONE)
+ .build();
+ return new ContainerWithPipeline(containerInfo, pipeline);
+ }
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
index ccc1c80..9f47779 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
@@ -19,10 +19,9 @@
package org.apache.hadoop.ozone.recon.scm;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
-import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
-import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
-import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
+import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -32,10 +31,11 @@ import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.junit.Test;
/**
@@ -45,39 +45,57 @@ public class TestReconContainerManager
extends AbstractReconContainerManagerTest {
@Test
- public void testAddNewContainer() throws IOException {
- ContainerID containerID = new ContainerID(100L);
- Pipeline pipeline = getRandomPipeline();
- ReconPipelineManager pipelineManager = getPipelineManager();
- pipelineManager.addPipeline(pipeline);
- ContainerInfo containerInfo =
- new ContainerInfo.Builder()
- .setContainerID(containerID.getId())
- .setNumberOfKeys(10)
- .setPipelineID(pipeline.getId())
- .setReplicationFactor(ONE)
- .setOwner("test")
- .setState(OPEN)
- .setReplicationType(STAND_ALONE)
- .build();
+ public void testAddNewOpenContainer() throws IOException {
ContainerWithPipeline containerWithPipeline =
- new ContainerWithPipeline(containerInfo, pipeline);
+ getTestContainer(LifeCycleState.OPEN);
+ ContainerID containerID =
+ containerWithPipeline.getContainerInfo().containerID();
+ ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
ReconContainerManager containerManager = getContainerManager();
assertFalse(containerManager.exists(containerID));
+ assertFalse(getContainerTable().isExist(containerID));
containerManager.addNewContainer(
containerID.getId(), containerWithPipeline);
assertTrue(containerManager.exists(containerID));
- List<ContainerInfo> containers = containerManager.getContainers(OPEN);
+ List<ContainerInfo> containers =
+ containerManager.getContainers(LifeCycleState.OPEN);
assertEquals(1, containers.size());
assertEquals(containerInfo, containers.get(0));
NavigableSet<ContainerID> containersInPipeline =
- pipelineManager.getContainersInPipeline(pipeline.getId());
+ getPipelineManager().getContainersInPipeline(
+ containerWithPipeline.getPipeline().getId());
assertEquals(1, containersInPipeline.size());
assertEquals(containerID, containersInPipeline.first());
+
+ // Verify container DB.
+ assertTrue(getContainerTable().isExist(containerID));
+ }
+
+ @Test
+ public void testAddNewClosedContainer() throws IOException {
+ ContainerWithPipeline containerWithPipeline = getTestContainer(CLOSED);
+ ContainerID containerID =
+ containerWithPipeline.getContainerInfo().containerID();
+ ContainerInfo containerInfo = containerWithPipeline.getContainerInfo();
+
+ ReconContainerManager containerManager = getContainerManager();
+ assertFalse(containerManager.exists(containerID));
+ assertFalse(getContainerTable().isExist(containerID));
+
+ containerManager.addNewContainer(
+ containerID.getId(), containerWithPipeline);
+
+ assertTrue(containerManager.exists(containerID));
+
+ List<ContainerInfo> containers = containerManager.getContainers(CLOSED);
+ assertEquals(1, containers.size());
+ assertEquals(containerInfo, containers.get(0));
+ // Verify container DB.
+ assertTrue(getContainerTable().isExist(containerID));
}
@Test
@@ -86,12 +104,39 @@ public class TestReconContainerManager
ReconContainerManager containerManager = getContainerManager();
assertFalse(containerManager.exists(containerID));
DatanodeDetails datanodeDetails = randomDatanodeDetails();
- containerManager.checkAndAddNewContainer(containerID, datanodeDetails);
+ containerManager.checkAndAddNewContainer(containerID,
+ OPEN, datanodeDetails);
assertTrue(containerManager.exists(containerID));
// Doing it one more time should not change any state.
- containerManager.checkAndAddNewContainer(containerID, datanodeDetails);
+ containerManager.checkAndAddNewContainer(containerID, OPEN,
+ datanodeDetails);
assertTrue(containerManager.exists(containerID));
+ assertEquals(LifeCycleState.OPEN,
+ getContainerManager().getContainer(containerID).getState());
}
+ @Test
+ public void testUpdateContainerStateFromOpen() throws IOException {
+ ContainerWithPipeline containerWithPipeline =
+ getTestContainer(LifeCycleState.OPEN);
+
+ long id = containerWithPipeline.getContainerInfo().getContainerID();
+ ContainerID containerID =
+ containerWithPipeline.getContainerInfo().containerID();
+
+ // Adding container #100.
+ getContainerManager().addNewContainer(id, containerWithPipeline);
+ assertEquals(LifeCycleState.OPEN,
+ getContainerManager().getContainer(containerID).getState());
+
+ DatanodeDetails datanodeDetails = randomDatanodeDetails();
+
+ // First report with "CLOSED" replica state moves container state to
+ // "CLOSING".
+ getContainerManager().checkAndAddNewContainer(containerID, State.CLOSED,
+ datanodeDetails);
+ assertEquals(CLOSING,
+ getContainerManager().getContainer(containerID).getState());
+ }
}
\ No newline at end of file
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
index dacf293..1b42f21 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
@@ -19,23 +19,28 @@
package org.apache.hadoop.ozone.recon.scm;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.UUID;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -90,6 +95,64 @@ public class TestReconIncrementalContainerReportHandler
nodeManager.addContainer(datanodeDetails, containerID);
assertTrue(containerManager.exists(containerID));
assertEquals(1, containerManager.getContainerReplicas(containerID).size());
+ assertEquals(OPEN, containerManager.getContainer(containerID).getState());
+ }
+
+ @Test
+ public void testProcessICRStateMismatch() throws IOException {
+
+ // Recon container state is "OPEN".
+ // Replica state could be any Non OPEN state.
+ long containerId = 11;
+ for (State state : Arrays.asList(State.CLOSING, State.QUASI_CLOSED,
+ State.CLOSED)) {
+ ContainerWithPipeline containerWithPipeline = getTestContainer(
+ containerId++, OPEN);
+ ContainerID containerID =
+ containerWithPipeline.getContainerInfo().containerID();
+
+ ReconContainerManager containerManager = getContainerManager();
+ containerManager.addNewContainer(containerID.getId(),
+ containerWithPipeline);
+
+ DatanodeDetails datanodeDetails =
+ containerWithPipeline.getPipeline().getFirstNode();
+ NodeManager nodeManagerMock = mock(NodeManager.class);
+ when(nodeManagerMock.getNodeByUuid(any())).thenReturn(datanodeDetails);
+ IncrementalContainerReportFromDatanode reportMock =
+ mock(IncrementalContainerReportFromDatanode.class);
+ when(reportMock.getDatanodeDetails())
+ .thenReturn(containerWithPipeline.getPipeline().getFirstNode());
+
+ IncrementalContainerReportProto containerReport =
+ getIncrementalContainerReportProto(containerID, state,
+ datanodeDetails.getUuidString());
+ when(reportMock.getReport()).thenReturn(containerReport);
+ ReconIncrementalContainerReportHandler reconIcr =
+ new ReconIncrementalContainerReportHandler(nodeManagerMock,
+ containerManager);
+
+ reconIcr.onMessage(reportMock, mock(EventPublisher.class));
+ assertTrue(containerManager.exists(containerID));
+ assertEquals(1,
+ containerManager.getContainerReplicas(containerID).size());
+ LifeCycleState expectedState = getContainerStateFromReplicaState(state);
+ LifeCycleState actualState =
+ containerManager.getContainer(containerID).getState();
+ assertEquals(String.format("Expecting %s in " +
+ "container state for replica state %s", expectedState,
+ state), expectedState, actualState);
+ }
+ }
+
+ private LifeCycleState getContainerStateFromReplicaState(
+ State state) {
+ switch (state) {
+ case CLOSING: return LifeCycleState.CLOSING;
+ case QUASI_CLOSED: return LifeCycleState.QUASI_CLOSED;
+ case CLOSED: return LifeCycleState.CLOSED;
+ default: return null;
+ }
}
private static IncrementalContainerReportProto
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]