HDDS-245. Handle ContainerReports in the SCM. Contributed by Elek Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f5dbbfe2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f5dbbfe2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f5dbbfe2 Branch: refs/heads/HDFS-12090 Commit: f5dbbfe2e97a8c11e3df0f95ae4a493f11fdbc28 Parents: b2517dd Author: Xiaoyu Yao <x...@apache.org> Authored: Thu Aug 9 16:55:13 2018 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Thu Aug 9 16:55:39 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hdds/server/events/EventQueue.java | 7 +- .../scm/container/ContainerReportHandler.java | 107 +++++- .../replication/ReplicationActivityStatus.java | 86 +++++ .../ReplicationActivityStatusMXBean.java | 28 ++ .../replication/ReplicationRequest.java | 28 +- .../hadoop/hdds/scm/events/SCMEvents.java | 9 + .../hdds/scm/node/states/Node2ContainerMap.java | 10 +- .../hdds/scm/node/states/ReportResult.java | 18 +- .../scm/server/StorageContainerManager.java | 27 +- .../container/TestContainerReportHandler.java | 228 +++++++++++++ .../scm/node/states/Node2ContainerMapTest.java | 308 ----------------- .../scm/node/states/TestNode2ContainerMap.java | 328 +++++++++++++++++++ 12 files changed, 859 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index f93c54b..b2b0df2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -147,7 +147,12 @@ public class EventQueue implements EventPublisher, AutoCloseable { for (EventHandler handler : executorAndHandlers.getValue()) { queuedCount.incrementAndGet(); - + if (LOG.isDebugEnabled()) { + LOG.debug("Delivering event {} to executor/handler {}: {}", + event.getName(), + executorAndHandlers.getKey().getName(), + payload); + } executorAndHandlers.getKey() .onMessage(handler, payload, this); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 486162e..b26eed2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -18,30 +18,131 @@ package org.apache.hadoop.hdds.scm.container; +import java.io.IOException; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Handles container reports from datanode. */ public class ContainerReportHandler implements EventHandler<ContainerReportFromDatanode> { - private final Mapping containerMapping; + private static final Logger LOG = + LoggerFactory.getLogger(ContainerReportHandler.class); + private final Node2ContainerMap node2ContainerMap; + private final Mapping containerMapping; + + private ContainerStateManager containerStateManager; + + private ReplicationActivityStatus replicationStatus; + + public ContainerReportHandler(Mapping containerMapping, - Node2ContainerMap node2ContainerMap) { + Node2ContainerMap node2ContainerMap, + ReplicationActivityStatus replicationActivityStatus) { + Preconditions.checkNotNull(containerMapping); + Preconditions.checkNotNull(node2ContainerMap); + Preconditions.checkNotNull(replicationActivityStatus); this.containerMapping = containerMapping; this.node2ContainerMap = node2ContainerMap; + this.containerStateManager = containerMapping.getStateManager(); + this.replicationStatus = replicationActivityStatus; } @Override public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, EventPublisher publisher) { - // TODO: process container report. + + DatanodeDetails datanodeOrigin = + containerReportFromDatanode.getDatanodeDetails(); + + ContainerReportsProto containerReport = + containerReportFromDatanode.getReport(); + try { + + //update state in container db and trigger close container events + containerMapping.processContainerReports(datanodeOrigin, containerReport); + + Set<ContainerID> containerIds = containerReport.getReportsList().stream() + .map(containerProto -> containerProto.getContainerID()) + .map(ContainerID::new) + .collect(Collectors.toSet()); + + ReportResult reportResult = node2ContainerMap + .processReport(datanodeOrigin.getUuid(), containerIds); + + //we have the report, so we can update the states for the next iteration. + node2ContainerMap + .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); + + for (ContainerID containerID : reportResult.getMissingContainers()) { + containerStateManager + .removeContainerReplica(containerID, datanodeOrigin); + emitReplicationRequestEvent(containerID, publisher); + } + + for (ContainerID containerID : reportResult.getNewContainers()) { + containerStateManager.addContainerReplica(containerID, datanodeOrigin); + + emitReplicationRequestEvent(containerID, publisher); + } + + } catch (IOException e) { + //TODO: stop all the replication? + LOG.error("Error on processing container report from datanode {}", + datanodeOrigin, e); + } + + } + + private void emitReplicationRequestEvent(ContainerID containerID, + EventPublisher publisher) throws SCMException { + ContainerInfo container = containerStateManager.getContainer(containerID); + + if (container == null) { + //warning unknown container + LOG.warn( + "Container is missing from containerStateManager. Can't request " + + "replication. {}", + containerID); + } + if (replicationStatus.isReplicationEnabled()) { + + int existingReplicas = + containerStateManager.getContainerReplicas(containerID).size(); + + int expectedReplicas = container.getReplicationFactor().getNumber(); + + if (existingReplicas != expectedReplicas) { + + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(containerID.getId(), existingReplicas, + container.getReplicationFactor().getNumber())); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java new file mode 100644 index 0000000..4a9888c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import javax.management.ObjectName; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.metrics2.util.MBeans; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Event listener to track the current state of replication. + */ +public class ReplicationActivityStatus + implements EventHandler<Boolean>, ReplicationActivityStatusMXBean, + Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationActivityStatus.class); + + private AtomicBoolean replicationEnabled = new AtomicBoolean(); + + private ObjectName jmxObjectName; + + public boolean isReplicationEnabled() { + return replicationEnabled.get(); + } + + @VisibleForTesting + public void setReplicationEnabled(boolean enabled) { + replicationEnabled.set(enabled); + } + + @VisibleForTesting + public void enableReplication() { + replicationEnabled.set(true); + } + + /** + * The replication status could be set by async events. + */ + @Override + public void onMessage(Boolean enabled, EventPublisher publisher) { + replicationEnabled.set(enabled); + } + + public void start() { + try { + this.jmxObjectName = + MBeans.register( + "StorageContainerManager", "ReplicationActivityStatus", this); + } catch (Exception ex) { + LOG.error("JMX bean for ReplicationActivityStatus can't be registered", + ex); + } + } + + @Override + public void close() throws IOException { + if (this.jmxObjectName != null) { + MBeans.unregister(jmxObjectName); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java new file mode 100644 index 0000000..164bd24 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +/** + * JMX interface to monitor replication status. + */ +public interface ReplicationActivityStatusMXBean { + + boolean isReplicationEnabled(); + + void setReplicationEnabled(boolean enabled); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java index ef7c546..d40cd9c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java @@ -29,18 +29,24 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; public class ReplicationRequest implements Comparable<ReplicationRequest>, Serializable { private final long containerId; - private final short replicationCount; - private final short expecReplicationCount; + private final int replicationCount; + private final int expecReplicationCount; private final long timestamp; - public ReplicationRequest(long containerId, short replicationCount, - long timestamp, short expecReplicationCount) { + public ReplicationRequest(long containerId, int replicationCount, + long timestamp, int expecReplicationCount) { this.containerId = containerId; this.replicationCount = replicationCount; this.timestamp = timestamp; this.expecReplicationCount = expecReplicationCount; } + public ReplicationRequest(long containerId, int replicationCount, + int expecReplicationCount) { + this(containerId, replicationCount, System.currentTimeMillis(), + expecReplicationCount); + } + /** * Compares this object with the specified object for order. Returns a * negative integer, zero, or a positive integer as this object is less @@ -93,7 +99,7 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>, return containerId; } - public short getReplicationCount() { + public int getReplicationCount() { return replicationCount; } @@ -101,7 +107,17 @@ public class ReplicationRequest implements Comparable<ReplicationRequest>, return timestamp; } - public short getExpecReplicationCount() { + public int getExpecReplicationCount() { return expecReplicationCount; } + + @Override + public String toString() { + return "ReplicationRequest{" + + "containerId=" + containerId + + ", replicationCount=" + replicationCount + + ", expecReplicationCount=" + expecReplicationCount + + ", timestamp=" + timestamp + + '}'; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index d49dd4f..70b1e96 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -174,6 +174,15 @@ public final class SCMEvents { new TypedEvent<>(ReplicationCompleted.class); /** + * Signal for all the components (but especially for the replication + * manager and container report handler) that the replication could be + * started. Should be send only if (almost) all the container state are + * available from the datanodes. + */ + public static final TypedEvent<Boolean> START_REPLICATION = + new TypedEvent<>(Boolean.class); + + /** * Private Ctor. Never Constructed. */ private SCMEvents() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java index 1960604..8ed6d59 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -68,7 +69,8 @@ public class Node2ContainerMap { throws SCMException { Preconditions.checkNotNull(containerIDs); Preconditions.checkNotNull(datanodeID); - if(dn2ContainerMap.putIfAbsent(datanodeID, containerIDs) != null) { + if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs)) + != null) { throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE); } @@ -82,11 +84,13 @@ public class Node2ContainerMap { * @throws SCMException - if we don't know about this datanode, for new DN * use insertNewDatanode. */ - public void updateDatanodeMap(UUID datanodeID, Set<ContainerID> containers) + public void setContainersForDatanode(UUID datanodeID, Set<ContainerID> containers) throws SCMException { Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(containers); - if(dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> v) == null){ + if (dn2ContainerMap + .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers)) + == null) { throw new SCMException("No such datanode", NO_SUCH_DATANODE); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java index cb06cb3..2697629 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java @@ -21,10 +21,13 @@ package org.apache.hadoop.hdds.scm.node.states; import org.apache.hadoop.hdds.scm.container.ContainerID; +import java.util.Collections; import java.util.Set; +import com.google.common.base.Preconditions; + /** - * A Container Report gets processsed by the Node2Container and returns the + * A Container Report gets processsed by the Node2Container and returns * Report Result class. */ public class ReportResult { @@ -36,6 +39,8 @@ public class ReportResult { Set<ContainerID> missingContainers, Set<ContainerID> newContainers) { this.status = status; + Preconditions.checkNotNull(missingContainers); + Preconditions.checkNotNull(newContainers); this.missingContainers = missingContainers; this.newContainers = newContainers; } @@ -80,7 +85,16 @@ public class ReportResult { } ReportResult build() { - return new ReportResult(status, missingContainers, newContainers); + + Set<ContainerID> nullSafeMissingContainers = this.missingContainers; + Set<ContainerID> nullSafeNewContainers = this.newContainers; + if (nullSafeNewContainers == null) { + nullSafeNewContainers = Collections.emptySet(); + } + if (nullSafeMissingContainers == null) { + nullSafeMissingContainers = Collections.emptySet(); + } + return new ReportResult(status, nullSafeMissingContainers, nullSafeNewContainers); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 9cb1318..47a9100 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.placement.algorithms @@ -164,9 +166,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl * Key = DatanodeUuid, value = ContainerStat. */ private Cache<String, ContainerStat> containerReportCache; + private final ReplicationManager replicationManager; + private final LeaseManager<Long> commandWatcherLeaseManager; + private final ReplicationActivityStatus replicationStatus; + /** * Creates a new StorageContainerManager. Configuration will be updated * with information on the @@ -199,19 +205,26 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); + replicationStatus = new ReplicationActivityStatus(); + CloseContainerEventHandler closeContainerHandler = new CloseContainerEventHandler(scmContainerManager); NodeReportHandler nodeReportHandler = new NodeReportHandler(scmNodeManager); - ContainerReportHandler containerReportHandler = - new ContainerReportHandler(scmContainerManager, node2ContainerMap); + CommandStatusReportHandler cmdStatusReportHandler = new CommandStatusReportHandler(); + NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); + ContainerReportHandler containerReportHandler = + new ContainerReportHandler(scmContainerManager, node2ContainerMap, + replicationStatus); + + eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); @@ -221,6 +234,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); + eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, @@ -580,6 +594,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl "server", getDatanodeProtocolServer().getDatanodeRpcAddress())); getDatanodeProtocolServer().start(); + replicationStatus.start(); httpServer.start(); scmBlockManager.start(); replicationManager.start(); @@ -592,6 +607,14 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl public void stop() { try { + LOG.info("Stopping Replication Activity Status tracker."); + replicationStatus.close(); + } catch (Exception ex) { + LOG.error("Replication Activity Status tracker stop failed.", ex); + } + + + try { LOG.info("Stopping Replication Manager Service."); replicationManager.stop(); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java new file mode 100644 index 0000000..363db99 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo + .Builder; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerReportFromDatanode; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.anyLong; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the behaviour of the ContainerReportHandler. + */ +public class TestContainerReportHandler implements EventPublisher { + + private List<Object> publishedEvents = new ArrayList<>(); + + private static final Logger LOG = + LoggerFactory.getLogger(TestContainerReportHandler.class); + + @Before + public void resetEventCollector() { + publishedEvents.clear(); + } + + @Test + public void test() throws IOException { + + //given + + OzoneConfiguration conf = new OzoneConfiguration(); + Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); + Mapping mapping = Mockito.mock(Mapping.class); + + when(mapping.getContainer(anyLong())) + .thenAnswer( + (Answer<ContainerInfo>) invocation -> + new Builder() + .setReplicationFactor(ReplicationFactor.THREE) + .setContainerID((Long) invocation.getArguments()[0]) + .build() + ); + + ContainerStateManager containerStateManager = + new ContainerStateManager(conf, mapping); + + when(mapping.getStateManager()).thenReturn(containerStateManager); + + ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + + ContainerReportHandler reportHandler = + new ContainerReportHandler(mapping, node2ContainerMap, + replicationActivityStatus); + + DatanodeDetails dn1 = TestUtils.randomDatanodeDetails(); + DatanodeDetails dn2 = TestUtils.randomDatanodeDetails(); + DatanodeDetails dn3 = TestUtils.randomDatanodeDetails(); + DatanodeDetails dn4 = TestUtils.randomDatanodeDetails(); + node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>()); + node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>()); + node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>()); + node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>()); + PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class); + + Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED, + ReplicationType.STAND_ALONE, ReplicationFactor.THREE, "pipeline1"); + + when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE, + ReplicationFactor.THREE)).thenReturn(pipeline); + + long c1 = containerStateManager + .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE, + ReplicationFactor.THREE, "root").getContainerInfo() + .getContainerID(); + + long c2 = containerStateManager + .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE, + ReplicationFactor.THREE, "root").getContainerInfo() + .getContainerID(); + + //when + + //initial reports before replication is enabled. 2 containers w 3 replicas. + reportHandler.onMessage( + new ContainerReportFromDatanode(dn1, + createContainerReport(new long[] {c1, c2})), this); + + reportHandler.onMessage( + new ContainerReportFromDatanode(dn2, + createContainerReport(new long[] {c1, c2})), this); + + reportHandler.onMessage( + new ContainerReportFromDatanode(dn3, + createContainerReport(new long[] {c1, c2})), this); + + reportHandler.onMessage( + new ContainerReportFromDatanode(dn4, + createContainerReport(new long[] {})), this); + + Assert.assertEquals(0, publishedEvents.size()); + + replicationActivityStatus.enableReplication(); + + //no problem here + reportHandler.onMessage( + new ContainerReportFromDatanode(dn1, + createContainerReport(new long[] {c1, c2})), this); + + Assert.assertEquals(0, publishedEvents.size()); + + //container is missing from d2 + reportHandler.onMessage( + new ContainerReportFromDatanode(dn2, + createContainerReport(new long[] {c1})), this); + + Assert.assertEquals(1, publishedEvents.size()); + ReplicationRequest replicationRequest = + (ReplicationRequest) publishedEvents.get(0); + + Assert.assertEquals(c2, replicationRequest.getContainerId()); + Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); + Assert.assertEquals(2, replicationRequest.getReplicationCount()); + + //container was replicated to dn4 + reportHandler.onMessage( + new ContainerReportFromDatanode(dn4, + createContainerReport(new long[] {c2})), this); + + //no more event, everything is perfect + Assert.assertEquals(1, publishedEvents.size()); + + //c2 was found at dn2 (it was missing before, magic) + reportHandler.onMessage( + new ContainerReportFromDatanode(dn2, + createContainerReport(new long[] {c1, c2})), this); + + //c2 is over replicated (dn1,dn2,dn3,dn4) + Assert.assertEquals(2, publishedEvents.size()); + + replicationRequest = + (ReplicationRequest) publishedEvents.get(1); + + Assert.assertEquals(c2, replicationRequest.getContainerId()); + Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); + Assert.assertEquals(4, replicationRequest.getReplicationCount()); + + } + + private ContainerReportsProto createContainerReport(long[] containerIds) { + + ContainerReportsProto.Builder crBuilder = + ContainerReportsProto.newBuilder(); + + for (long containerId : containerIds) { + org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder + ciBuilder = org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); + ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") + .setSize(5368709120L) + .setUsed(2000000000L) + .setKeyCount(100000000L) + .setReadCount(100000000L) + .setWriteCount(100000000L) + .setReadBytes(2000000000L) + .setWriteBytes(2000000000L) + .setContainerID(containerId) + .setDeleteTransactionId(0); + + crBuilder.addReports(ciBuilder.build()); + } + + return crBuilder.build(); + } + + @Override + public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent( + EVENT_TYPE event, PAYLOAD payload) { + LOG.info("Event is published: {}", payload); + publishedEvents.add(payload); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java deleted file mode 100644 index 79f1b40..0000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hadoop.hdds.scm.node.states; - -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Test classes for Node2ContainerMap. - */ -public class Node2ContainerMapTest { - private final static int DATANODE_COUNT = 300; - private final static int CONTAINER_COUNT = 1000; - private final Map<UUID, TreeSet<ContainerID>> testData = new - ConcurrentHashMap<>(); - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - private void generateData() { - for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) { - TreeSet<ContainerID> currentSet = new TreeSet<>(); - for (int cnIndex = 1; cnIndex <= CONTAINER_COUNT; cnIndex++) { - long currentCnIndex = (dnIndex * CONTAINER_COUNT) + cnIndex; - currentSet.add(new ContainerID(currentCnIndex)); - } - testData.put(UUID.randomUUID(), currentSet); - } - } - - private UUID getFirstKey() { - return testData.keySet().iterator().next(); - } - - @Before - public void setUp() throws Exception { - generateData(); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testIsKnownDatanode() throws SCMException { - Node2ContainerMap map = new Node2ContainerMap(); - UUID knownNode = getFirstKey(); - UUID unknownNode = UUID.randomUUID(); - Set<ContainerID> containerIDs = testData.get(knownNode); - map.insertNewDatanode(knownNode, containerIDs); - Assert.assertTrue("Not able to detect a known node", - map.isKnownDatanode(knownNode)); - Assert.assertFalse("Unknown node detected", - map.isKnownDatanode(unknownNode)); - } - - @Test - public void testInsertNewDatanode() throws SCMException { - Node2ContainerMap map = new Node2ContainerMap(); - UUID knownNode = getFirstKey(); - Set<ContainerID> containerIDs = testData.get(knownNode); - map.insertNewDatanode(knownNode, containerIDs); - Set<ContainerID> readSet = map.getContainers(knownNode); - - // Assert that all elements are present in the set that we read back from - // node map. - Set newSet = new TreeSet((readSet)); - Assert.assertTrue(newSet.removeAll(containerIDs)); - Assert.assertTrue(newSet.size() == 0); - - thrown.expect(SCMException.class); - thrown.expectMessage("already exists"); - map.insertNewDatanode(knownNode, containerIDs); - - map.removeDatanode(knownNode); - map.insertNewDatanode(knownNode, containerIDs); - - } - - @Test - public void testProcessReportCheckOneNode() throws SCMException { - UUID key = getFirstKey(); - Set<ContainerID> values = testData.get(key); - Node2ContainerMap map = new Node2ContainerMap(); - map.insertNewDatanode(key, values); - Assert.assertTrue(map.isKnownDatanode(key)); - ReportResult result = map.processReport(key, values); - Assert.assertEquals(result.getStatus(), - Node2ContainerMap.ReportStatus.ALL_IS_WELL); - } - - @Test - public void testProcessReportInsertAll() throws SCMException { - Node2ContainerMap map = new Node2ContainerMap(); - - for (Map.Entry<UUID, TreeSet<ContainerID>> keyEntry : testData.entrySet()) { - map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue()); - } - // Assert all Keys are known datanodes. - for (UUID key : testData.keySet()) { - Assert.assertTrue(map.isKnownDatanode(key)); - } - } - - /* - For ProcessReport we have to test the following scenarios. - - 1. New Datanode - A new datanode appears and we have to add that to the - SCM's Node2Container Map. - - 2. New Container - A Datanode exists, but a new container is added to that - DN. We need to detect that and return a list of added containers. - - 3. Missing Container - A Datanode exists, but one of the expected container - on that datanode is missing. We need to detect that. - - 4. We get a container report that has both the missing and new containers. - We need to return separate lists for these. - */ - - /** - * Assert that we are able to detect the addition of a new datanode. - * - * @throws SCMException - */ - @Test - public void testProcessReportDetectNewDataNode() throws SCMException { - Node2ContainerMap map = new Node2ContainerMap(); - // If we attempt to process a node that is not present in the map, - // we get a result back that says, NEW_NODE_FOUND. - UUID key = getFirstKey(); - TreeSet<ContainerID> values = testData.get(key); - ReportResult result = map.processReport(key, values); - Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND, - result.getStatus()); - Assert.assertEquals(result.getNewContainers().size(), values.size()); - } - - /** - * This test asserts that processReport is able to detect new containers - * when it is added to a datanode. For that we populate the DN with a list - * of containerIDs and then add few more containers and make sure that we - * are able to detect them. - * - * @throws SCMException - */ - @Test - public void testProcessReportDetectNewContainers() throws SCMException { - Node2ContainerMap map = new Node2ContainerMap(); - UUID key = getFirstKey(); - TreeSet<ContainerID> values = testData.get(key); - map.insertNewDatanode(key, values); - - final int newCount = 100; - // This is not a mistake, the treeset seems to be reverse sorted. - ContainerID last = values.pollFirst(); - TreeSet<ContainerID> addedContainers = new TreeSet<>(); - for (int x = 1; x <= newCount; x++) { - long cTemp = last.getId() + x; - addedContainers.add(new ContainerID(cTemp)); - } - - // This set is the super set of existing containers and new containers. - TreeSet<ContainerID> newContainersSet = new TreeSet<>(values); - newContainersSet.addAll(addedContainers); - - ReportResult result = map.processReport(key, newContainersSet); - - //Assert that expected size of missing container is same as addedContainers - Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND, - result.getStatus()); - - Assert.assertEquals(addedContainers.size(), - result.getNewContainers().size()); - - // Assert that the Container IDs are the same as we added new. - Assert.assertTrue("All objects are not removed.", - result.getNewContainers().removeAll(addedContainers)); - } - - /** - * This test asserts that processReport is able to detect missing containers - * if they are misssing from a list. - * - * @throws SCMException - */ - @Test - public void testProcessReportDetectMissingContainers() throws SCMException { - Node2ContainerMap map = new Node2ContainerMap(); - UUID key = getFirstKey(); - TreeSet<ContainerID> values = testData.get(key); - map.insertNewDatanode(key, values); - - final int removeCount = 100; - Random r = new Random(); - - ContainerID first = values.pollLast(); - TreeSet<ContainerID> removedContainers = new TreeSet<>(); - - // Pick a random container to remove it is ok to collide no issues. - for (int x = 0; x < removeCount; x++) { - int startBase = (int) first.getId(); - long cTemp = r.nextInt(values.size()); - removedContainers.add(new ContainerID(cTemp + startBase)); - } - - // This set is a new set with some containers removed. - TreeSet<ContainerID> newContainersSet = new TreeSet<>(values); - newContainersSet.removeAll(removedContainers); - - ReportResult result = map.processReport(key, newContainersSet); - - - //Assert that expected size of missing container is same as addedContainers - Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS, - result.getStatus()); - Assert.assertEquals(removedContainers.size(), - result.getMissingContainers().size()); - - // Assert that the Container IDs are the same as we added new. - Assert.assertTrue("All missing containers not found.", - result.getMissingContainers().removeAll(removedContainers)); - } - - @Test - public void testProcessReportDetectNewAndMissingContainers() throws - SCMException { - Node2ContainerMap map = new Node2ContainerMap(); - UUID key = getFirstKey(); - TreeSet<ContainerID> values = testData.get(key); - map.insertNewDatanode(key, values); - - Set<ContainerID> insertedSet = new TreeSet<>(); - // Insert nodes from 1..30 - for (int x = 1; x <= 30; x++) { - insertedSet.add(new ContainerID(x)); - } - - - final int removeCount = 100; - Random r = new Random(); - - ContainerID first = values.pollLast(); - TreeSet<ContainerID> removedContainers = new TreeSet<>(); - - // Pick a random container to remove it is ok to collide no issues. - for (int x = 0; x < removeCount; x++) { - int startBase = (int) first.getId(); - long cTemp = r.nextInt(values.size()); - removedContainers.add(new ContainerID(cTemp + startBase)); - } - - Set<ContainerID> newSet = new TreeSet<>(values); - newSet.addAll(insertedSet); - newSet.removeAll(removedContainers); - - ReportResult result = map.processReport(key, newSet); - - - Assert.assertEquals( - Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND, - result.getStatus()); - Assert.assertEquals(removedContainers.size(), - result.getMissingContainers().size()); - - - // Assert that the Container IDs are the same as we added new. - Assert.assertTrue("All missing containers not found.", - result.getMissingContainers().removeAll(removedContainers)); - - Assert.assertEquals(insertedSet.size(), - result.getNewContainers().size()); - - // Assert that the Container IDs are the same as we added new. - Assert.assertTrue("All inserted containers are not found.", - result.getNewContainers().removeAll(insertedSet)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5dbbfe2/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java new file mode 100644 index 0000000..633653b --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.node.states; + +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Test classes for Node2ContainerMap. + */ +public class TestNode2ContainerMap { + private final static int DATANODE_COUNT = 300; + private final static int CONTAINER_COUNT = 1000; + private final Map<UUID, TreeSet<ContainerID>> testData = new + ConcurrentHashMap<>(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private void generateData() { + for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) { + TreeSet<ContainerID> currentSet = new TreeSet<>(); + for (int cnIndex = 1; cnIndex <= CONTAINER_COUNT; cnIndex++) { + long currentCnIndex = (dnIndex * CONTAINER_COUNT) + cnIndex; + currentSet.add(new ContainerID(currentCnIndex)); + } + testData.put(UUID.randomUUID(), currentSet); + } + } + + private UUID getFirstKey() { + return testData.keySet().iterator().next(); + } + + @Before + public void setUp() throws Exception { + generateData(); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testIsKnownDatanode() throws SCMException { + Node2ContainerMap map = new Node2ContainerMap(); + UUID knownNode = getFirstKey(); + UUID unknownNode = UUID.randomUUID(); + Set<ContainerID> containerIDs = testData.get(knownNode); + map.insertNewDatanode(knownNode, containerIDs); + Assert.assertTrue("Not able to detect a known node", + map.isKnownDatanode(knownNode)); + Assert.assertFalse("Unknown node detected", + map.isKnownDatanode(unknownNode)); + } + + @Test + public void testInsertNewDatanode() throws SCMException { + Node2ContainerMap map = new Node2ContainerMap(); + UUID knownNode = getFirstKey(); + Set<ContainerID> containerIDs = testData.get(knownNode); + map.insertNewDatanode(knownNode, containerIDs); + Set<ContainerID> readSet = map.getContainers(knownNode); + + // Assert that all elements are present in the set that we read back from + // node map. + Set newSet = new TreeSet((readSet)); + Assert.assertTrue(newSet.removeAll(containerIDs)); + Assert.assertTrue(newSet.size() == 0); + + thrown.expect(SCMException.class); + thrown.expectMessage("already exists"); + map.insertNewDatanode(knownNode, containerIDs); + + map.removeDatanode(knownNode); + map.insertNewDatanode(knownNode, containerIDs); + + } + + @Test + public void testProcessReportCheckOneNode() throws SCMException { + UUID key = getFirstKey(); + Set<ContainerID> values = testData.get(key); + Node2ContainerMap map = new Node2ContainerMap(); + map.insertNewDatanode(key, values); + Assert.assertTrue(map.isKnownDatanode(key)); + ReportResult result = map.processReport(key, values); + Assert.assertEquals(result.getStatus(), + Node2ContainerMap.ReportStatus.ALL_IS_WELL); + } + + @Test + public void testUpdateDatanodeMap() throws SCMException { + UUID datanodeId = getFirstKey(); + Set<ContainerID> values = testData.get(datanodeId); + Node2ContainerMap map = new Node2ContainerMap(); + map.insertNewDatanode(datanodeId, values); + Assert.assertTrue(map.isKnownDatanode(datanodeId)); + Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size()); + + //remove one container + values.remove(values.iterator().next()); + Assert.assertEquals(CONTAINER_COUNT - 1, values.size()); + Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size()); + + map.setContainersForDatanode(datanodeId, values); + + Assert.assertEquals(values.size(), map.getContainers(datanodeId).size()); + Assert.assertEquals(values, map.getContainers(datanodeId)); + } + + @Test + public void testProcessReportInsertAll() throws SCMException { + Node2ContainerMap map = new Node2ContainerMap(); + + for (Map.Entry<UUID, TreeSet<ContainerID>> keyEntry : testData.entrySet()) { + map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue()); + } + // Assert all Keys are known datanodes. + for (UUID key : testData.keySet()) { + Assert.assertTrue(map.isKnownDatanode(key)); + } + } + + /* + For ProcessReport we have to test the following scenarios. + + 1. New Datanode - A new datanode appears and we have to add that to the + SCM's Node2Container Map. + + 2. New Container - A Datanode exists, but a new container is added to that + DN. We need to detect that and return a list of added containers. + + 3. Missing Container - A Datanode exists, but one of the expected container + on that datanode is missing. We need to detect that. + + 4. We get a container report that has both the missing and new containers. + We need to return separate lists for these. + */ + + /** + * Assert that we are able to detect the addition of a new datanode. + * + * @throws SCMException + */ + @Test + public void testProcessReportDetectNewDataNode() throws SCMException { + Node2ContainerMap map = new Node2ContainerMap(); + // If we attempt to process a node that is not present in the map, + // we get a result back that says, NEW_NODE_FOUND. + UUID key = getFirstKey(); + TreeSet<ContainerID> values = testData.get(key); + ReportResult result = map.processReport(key, values); + Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND, + result.getStatus()); + Assert.assertEquals(result.getNewContainers().size(), values.size()); + } + + /** + * This test asserts that processReport is able to detect new containers + * when it is added to a datanode. For that we populate the DN with a list + * of containerIDs and then add few more containers and make sure that we + * are able to detect them. + * + * @throws SCMException + */ + @Test + public void testProcessReportDetectNewContainers() throws SCMException { + Node2ContainerMap map = new Node2ContainerMap(); + UUID key = getFirstKey(); + TreeSet<ContainerID> values = testData.get(key); + map.insertNewDatanode(key, values); + + final int newCount = 100; + // This is not a mistake, the treeset seems to be reverse sorted. + ContainerID last = values.first(); + TreeSet<ContainerID> addedContainers = new TreeSet<>(); + for (int x = 1; x <= newCount; x++) { + long cTemp = last.getId() + x; + addedContainers.add(new ContainerID(cTemp)); + } + + // This set is the super set of existing containers and new containers. + TreeSet<ContainerID> newContainersSet = new TreeSet<>(values); + newContainersSet.addAll(addedContainers); + + ReportResult result = map.processReport(key, newContainersSet); + + //Assert that expected size of missing container is same as addedContainers + Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND, + result.getStatus()); + + Assert.assertEquals(addedContainers.size(), + result.getNewContainers().size()); + + // Assert that the Container IDs are the same as we added new. + Assert.assertTrue("All objects are not removed.", + result.getNewContainers().removeAll(addedContainers)); + } + + /** + * This test asserts that processReport is able to detect missing containers + * if they are misssing from a list. + * + * @throws SCMException + */ + @Test + public void testProcessReportDetectMissingContainers() throws SCMException { + Node2ContainerMap map = new Node2ContainerMap(); + UUID key = getFirstKey(); + TreeSet<ContainerID> values = testData.get(key); + map.insertNewDatanode(key, values); + + final int removeCount = 100; + Random r = new Random(); + + ContainerID first = values.last(); + TreeSet<ContainerID> removedContainers = new TreeSet<>(); + + // Pick a random container to remove it is ok to collide no issues. + for (int x = 0; x < removeCount; x++) { + int startBase = (int) first.getId(); + long cTemp = r.nextInt(values.size()); + removedContainers.add(new ContainerID(cTemp + startBase)); + } + + // This set is a new set with some containers removed. + TreeSet<ContainerID> newContainersSet = new TreeSet<>(values); + newContainersSet.removeAll(removedContainers); + + ReportResult result = map.processReport(key, newContainersSet); + + + //Assert that expected size of missing container is same as addedContainers + Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS, + result.getStatus()); + Assert.assertEquals(removedContainers.size(), + result.getMissingContainers().size()); + + // Assert that the Container IDs are the same as we added new. + Assert.assertTrue("All missing containers not found.", + result.getMissingContainers().removeAll(removedContainers)); + } + + @Test + public void testProcessReportDetectNewAndMissingContainers() throws + SCMException { + Node2ContainerMap map = new Node2ContainerMap(); + UUID key = getFirstKey(); + TreeSet<ContainerID> values = testData.get(key); + map.insertNewDatanode(key, values); + + Set<ContainerID> insertedSet = new TreeSet<>(); + // Insert nodes from 1..30 + for (int x = 1; x <= 30; x++) { + insertedSet.add(new ContainerID(x)); + } + + + final int removeCount = 100; + Random r = new Random(); + + ContainerID first = values.last(); + TreeSet<ContainerID> removedContainers = new TreeSet<>(); + + // Pick a random container to remove it is ok to collide no issues. + for (int x = 0; x < removeCount; x++) { + int startBase = (int) first.getId(); + long cTemp = r.nextInt(values.size()); + removedContainers.add(new ContainerID(cTemp + startBase)); + } + + Set<ContainerID> newSet = new TreeSet<>(values); + newSet.addAll(insertedSet); + newSet.removeAll(removedContainers); + + ReportResult result = map.processReport(key, newSet); + + + Assert.assertEquals( + Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND, + result.getStatus()); + Assert.assertEquals(removedContainers.size(), + result.getMissingContainers().size()); + + + // Assert that the Container IDs are the same as we added new. + Assert.assertTrue("All missing containers not found.", + result.getMissingContainers().removeAll(removedContainers)); + + Assert.assertEquals(insertedSet.size(), + result.getNewContainers().size()); + + // Assert that the Container IDs are the same as we added new. + Assert.assertTrue("All inserted containers are not found.", + result.getNewContainers().removeAll(insertedSet)); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org