Repository: hadoop Updated Branches: refs/heads/HDFS-7240 c8a8ee500 -> 74484754a
HDFS-13070. Ozone: SCM: Support for container replica reconciliation - 1. Contributed by Nanda kumar. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/74484754 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/74484754 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/74484754 Branch: refs/heads/HDFS-7240 Commit: 74484754ac30300fce5b5682ee4ba464dfe3108d Parents: c8a8ee5 Author: Nanda kumar <na...@apache.org> Authored: Wed Feb 21 00:40:00 2018 +0530 Committer: Nanda kumar <na...@apache.org> Committed: Wed Feb 21 00:40:00 2018 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/scm/ScmConfigKeys.java | 6 + .../ozone/scm/StorageContainerManager.java | 4 +- .../ozone/scm/container/ContainerMapping.java | 27 +- .../hadoop/ozone/scm/container/Mapping.java | 14 +- .../ContainerReplicationManager.java | 308 ----------------- .../replication/ContainerSupervisor.java | 333 +++++++++++++++++++ .../container/replication/InProgressPool.java | 40 ++- .../hadoop/ozone/scm/node/NodeManager.java | 6 + .../hadoop/ozone/scm/node/SCMNodeManager.java | 5 + .../TestUtils/ReplicationNodeManagerMock.java | 20 +- .../TestContainerReplicationManager.java | 264 --------------- .../replication/TestContainerSupervisor.java | 269 +++++++++++++++ .../ozone/scm/container/MockNodeManager.java | 7 + .../scm/container/TestContainerMapping.java | 18 +- 14 files changed, 703 insertions(+), 618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 6f5c873..fbe9637 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -224,6 +224,12 @@ public final class ScmConfigKeys { OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s"; /** + * This determines the total number of pools to be processed in parallel. + */ + public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS = + "ozone.scm.max.nodepool.processing.threads"; + public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1; + /** * These 2 settings control the number of threads in executor pool and time * outs for thw container reports from all nodes. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 3276db8..4245fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -984,9 +984,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl updateContainerReportMetrics(reports); // should we process container reports async? - scmContainerManager.processContainerReports( - DatanodeID.getFromProtoBuf(reports.getDatanodeID()), - reports.getType(), reports.getReportsList()); + scmContainerManager.processContainerReports(reports); return ContainerReportsResponseProto.newBuilder().build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index fe86064..8a82c82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseException; @@ -29,7 +28,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor; import org.apache.hadoop.ozone.scm.exceptions.SCMException; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector; @@ -74,6 +75,7 @@ public class ContainerMapping implements Mapping { private final PipelineSelector pipelineSelector; private final ContainerStateManager containerStateManager; private final LeaseManager<ContainerInfo> containerLeaseManager; + private final ContainerSupervisor containerSupervisor; private final float containerCloseThreshold; /** @@ -113,6 +115,9 @@ public class ContainerMapping implements Mapping { this.pipelineSelector = new PipelineSelector(nodeManager, conf); this.containerStateManager = new ContainerStateManager(conf, this); + this.containerSupervisor = + new ContainerSupervisor(conf, nodeManager, + nodeManager.getNodePoolManager()); this.containerCloseThreshold = conf.getFloat( ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -347,16 +352,14 @@ public class ContainerMapping implements Mapping { /** * Process container report from Datanode. * - * @param datanodeID Datanode ID - * @param reportType Type of report - * @param containerInfos container details + * @param reports Container report */ @Override - public void processContainerReports( - DatanodeID datanodeID, - ContainerReportsRequestProto.reportType reportType, - List<StorageContainerDatanodeProtocolProtos.ContainerInfo> - containerInfos) throws IOException { + public void processContainerReports(ContainerReportsRequestProto reports) + throws IOException { + List<StorageContainerDatanodeProtocolProtos.ContainerInfo> + containerInfos = reports.getReportsList(); + containerSupervisor.handleContainerReport(reports); for (StorageContainerDatanodeProtocolProtos.ContainerInfo containerInfo : containerInfos) { byte[] dbKey = containerInfo.getContainerNameBytes().toByteArray(); @@ -395,7 +398,7 @@ public class ContainerMapping implements Mapping { // TODO: Handling of containers which are already in close queue. if (containerUsedPercentage >= containerCloseThreshold) { // TODO: The container has to be moved to close container queue. - // For now, we are just updating the container state to CLOSED. + // For now, we are just updating the container state to CLOSING. // Close container implementation can decide on how to maintain // list of containers to be closed, this is the place where we // have to add the containers to that list. @@ -412,7 +415,7 @@ public class ContainerMapping implements Mapping { // Container not found in our container db. LOG.error("Error while processing container report from datanode :" + " {}, for container: {}, reason: container doesn't exist in" + - "container database.", datanodeID, + "container database.", reports.getDatanodeID(), containerInfo.getContainerName()); } } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java index 577571f..0d442d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java @@ -17,11 +17,8 @@ package org.apache.hadoop.ozone.scm.container; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; @@ -102,14 +99,9 @@ public interface Mapping extends Closeable { /** * Process container report from Datanode. * - * @param datanodeID Datanode ID - * @param reportType Type of report - * @param containerInfos container details + * @param reports Container report */ - void processContainerReports( - DatanodeID datanodeID, - ContainerReportsRequestProto.reportType reportType, - List<StorageContainerDatanodeProtocolProtos.ContainerInfo> - containerInfos) throws IOException; + void processContainerReports(ContainerReportsRequestProto reports) + throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java deleted file mode 100644 index f9b86f5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.scm.container.replication; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.scm.exceptions.SCMException; -import org.apache.hadoop.ozone.scm.node.CommandQueue; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.node.NodePoolManager; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.google.common.util.concurrent.Uninterruptibles - .sleepUninterruptibly; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT; - -/** - * This class takes a set of container reports that belong to a pool and then - * computes the replication levels for each container. - */ -public class ContainerReplicationManager implements Closeable { - public static final Logger LOG = - LoggerFactory.getLogger(ContainerReplicationManager.class); - - private final NodePoolManager poolManager; - private final CommandQueue commandQueue; - private final HashSet<String> poolNames; - private final PriorityQueue<PeriodicPool> poolQueue; - private final NodeManager nodeManager; - private final long containerProcessingLag; - private final AtomicBoolean runnable; - private final ExecutorService executorService; - private final long maxPoolWait; - private long poolProcessCount; - private final List<InProgressPool> inProgressPoolList; - private final AtomicInteger threadFaultCount; - - /** - * Returns the number of times we have processed pools. - * @return long - */ - public long getPoolProcessCount() { - return poolProcessCount; - } - - - /** - * Constructs a class that computes Replication Levels. - * - * @param conf - OzoneConfiguration - * @param nodeManager - Node Manager - * @param poolManager - Pool Manager - * @param commandQueue - Datanodes Command Queue. - */ - public ContainerReplicationManager(OzoneConfiguration conf, - NodeManager nodeManager, NodePoolManager poolManager, - CommandQueue commandQueue) { - Preconditions.checkNotNull(poolManager); - Preconditions.checkNotNull(commandQueue); - Preconditions.checkNotNull(nodeManager); - this.containerProcessingLag = - conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, - OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT, - TimeUnit.SECONDS - ) * 1000; - int maxContainerReportThreads = - conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS, - OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT - ); - this.maxPoolWait = - conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, - OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - this.poolManager = poolManager; - this.commandQueue = commandQueue; - this.nodeManager = nodeManager; - this.poolNames = new HashSet<>(); - this.poolQueue = new PriorityQueue<>(); - runnable = new AtomicBoolean(true); - this.threadFaultCount = new AtomicInteger(0); - executorService = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Container Reports Processing Thread - %d") - .build(), maxContainerReportThreads); - inProgressPoolList = new LinkedList<>(); - - initPoolProcessThread(); - } - - /** - * Returns the number of pools that are under process right now. - * @return int - Number of pools that are in process. - */ - public int getInProgressPoolCount() { - return inProgressPoolList.size(); - } - - /** - * Exits the background thread. - */ - public void setExit() { - this.runnable.set(false); - } - - /** - * Adds or removes pools from names that we need to process. - * - * There are two different cases that we need to process. - * The case where some pools are being added and some times we have to - * handle cases where pools are removed. - */ - private void refreshPools() { - List<String> pools = this.poolManager.getNodePools(); - if (pools != null) { - - HashSet<String> removedPools = - computePoolDifference(this.poolNames, new HashSet<>(pools)); - - HashSet<String> addedPools = - computePoolDifference(new HashSet<>(pools), this.poolNames); - // TODO: Support remove pool API in pool manager so that this code - // path can be tested. This never happens in the current code base. - for (String poolName : removedPools) { - for (PeriodicPool periodicPool : poolQueue) { - if (periodicPool.getPoolName().compareTo(poolName) == 0) { - poolQueue.remove(periodicPool); - } - } - } - // Remove the pool names that we have in the list. - this.poolNames.removeAll(removedPools); - - for (String poolName : addedPools) { - poolQueue.add(new PeriodicPool(poolName)); - } - - // Add to the pool names we are tracking. - poolNames.addAll(addedPools); - } - - } - - /** - * Handle the case where pools are added. - * - * @param newPools - New Pools list - * @param oldPool - oldPool List. - */ - private HashSet<String> computePoolDifference(HashSet<String> newPools, - Set<String> oldPool) { - Preconditions.checkNotNull(newPools); - Preconditions.checkNotNull(oldPool); - HashSet<String> newSet = new HashSet<>(newPools); - newSet.removeAll(oldPool); - return newSet; - } - - private void initPoolProcessThread() { - - /* - * Task that runs to check if we need to start a pool processing job. - * if so we create a pool reconciliation job and find out of all the - * expected containers are on the nodes. - */ - Runnable processPools = () -> { - while (runnable.get()) { - // Make sure that we don't have any new pools. - refreshPools(); - PeriodicPool pool = poolQueue.poll(); - if (pool != null) { - if (pool.getLastProcessedTime() + this.containerProcessingLag < - Time.monotonicNow()) { - LOG.debug("Adding pool {} to container processing queue", pool - .getPoolName()); - InProgressPool inProgressPool = new InProgressPool(maxPoolWait, - pool, this.nodeManager, this.poolManager, this.commandQueue, - this.executorService); - inProgressPool.startReconciliation(); - inProgressPoolList.add(inProgressPool); - poolProcessCount++; - - } else { - - LOG.debug("Not within the time window for processing: {}", - pool.getPoolName()); - // Put back this pool since we are not planning to process it. - poolQueue.add(pool); - // we might over sleep here, not a big deal. - sleepUninterruptibly(this.containerProcessingLag, - TimeUnit.MILLISECONDS); - } - } - sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS); - } - }; - - // We will have only one thread for pool processing. - Thread poolProcessThread = new Thread(processPools); - poolProcessThread.setDaemon(true); - poolProcessThread.setName("Pool replica thread"); - poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { - // Let us just restart this thread after logging a critical error. - // if this thread is not running we cannot handle commands from SCM. - LOG.error("Critical Error : Pool replica thread encountered an " + - "error. Thread: {} Error Count : {}", t.toString(), e, - threadFaultCount.incrementAndGet()); - poolProcessThread.start(); - // TODO : Add a config to restrict how many times we will restart this - // thread in a single session. - }); - poolProcessThread.start(); - } - - /** - * Adds a container report to appropriate inProgress Pool. - * @param containerReport -- Container report for a specific container from - * a datanode. - */ - public void handleContainerReport( - ContainerReportsRequestProto containerReport) { - String poolName = null; - DatanodeID datanodeID = DatanodeID - .getFromProtoBuf(containerReport.getDatanodeID()); - try { - poolName = poolManager.getNodePool(datanodeID); - } catch (SCMException e) { - LOG.warn("Skipping processing container report from datanode {}, " - + "cause: failed to get the corresponding node pool", - datanodeID.toString(), e); - return; - } - - for(InProgressPool ppool : inProgressPoolList) { - if(ppool.getPoolName().equalsIgnoreCase(poolName)) { - ppool.handleContainerReport(containerReport); - return; - } - } - // TODO: Decide if we can do anything else with this report. - LOG.debug("Discarding the container report for pool {}. That pool is not " + - "currently in the pool reconciliation process. Container Name: {}", - poolName, containerReport.getDatanodeID()); - } - - /** - * Get in process pool list, used for testing. - * @return List of InProgressPool - */ - @VisibleForTesting - public List<InProgressPool> getInProcessPoolList() { - return inProgressPoolList; - } - - /** - * Shutdown the Container Replication Manager. - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - setExit(); - HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java new file mode 100644 index 0000000..c063b8b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerSupervisor.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.container.replication; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.ozone.scm.node.NodePoolManager; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.google.common.util.concurrent.Uninterruptibles + .sleepUninterruptibly; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT; + +/** + * This class takes a set of container reports that belong to a pool and then + * computes the replication levels for each container. + */ +public class ContainerSupervisor implements Closeable { + public static final Logger LOG = + LoggerFactory.getLogger(ContainerSupervisor.class); + + private final NodePoolManager poolManager; + private final HashSet<String> poolNames; + private final PriorityQueue<PeriodicPool> poolQueue; + private final NodeManager nodeManager; + private final long containerProcessingLag; + private final AtomicBoolean runnable; + private final ExecutorService executorService; + private final long maxPoolWait; + private long poolProcessCount; + private final List<InProgressPool> inProgressPoolList; + private final AtomicInteger threadFaultCount; + private final int inProgressPoolMaxCount; + + private final ReadWriteLock inProgressPoolListLock; + + /** + * Returns the number of times we have processed pools. + * @return long + */ + public long getPoolProcessCount() { + return poolProcessCount; + } + + + /** + * Constructs a class that computes Replication Levels. + * + * @param conf - OzoneConfiguration + * @param nodeManager - Node Manager + * @param poolManager - Pool Manager + */ + public ContainerSupervisor(Configuration conf, NodeManager nodeManager, + NodePoolManager poolManager) { + Preconditions.checkNotNull(poolManager); + Preconditions.checkNotNull(nodeManager); + this.containerProcessingLag = + conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, + OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT, + TimeUnit.SECONDS + ) * 1000; + int maxContainerReportThreads = + conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS, + OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT + ); + this.maxPoolWait = + conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, + OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + this.inProgressPoolMaxCount = conf.getInt( + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS, + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT); + this.poolManager = poolManager; + this.nodeManager = nodeManager; + this.poolNames = new HashSet<>(); + this.poolQueue = new PriorityQueue<>(); + this.runnable = new AtomicBoolean(true); + this.threadFaultCount = new AtomicInteger(0); + this.executorService = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Container Reports Processing Thread - %d") + .build(), maxContainerReportThreads); + this.inProgressPoolList = new LinkedList<>(); + this.inProgressPoolListLock = new ReentrantReadWriteLock(); + + initPoolProcessThread(); + } + + /** + * Returns the number of pools that are under process right now. + * @return int - Number of pools that are in process. + */ + public int getInProgressPoolCount() { + return inProgressPoolList.size(); + } + + /** + * Exits the background thread. + */ + public void setExit() { + this.runnable.set(false); + } + + /** + * Adds or removes pools from names that we need to process. + * + * There are two different cases that we need to process. + * The case where some pools are being added and some times we have to + * handle cases where pools are removed. + */ + private void refreshPools() { + List<String> pools = this.poolManager.getNodePools(); + if (pools != null) { + + HashSet<String> removedPools = + computePoolDifference(this.poolNames, new HashSet<>(pools)); + + HashSet<String> addedPools = + computePoolDifference(new HashSet<>(pools), this.poolNames); + // TODO: Support remove pool API in pool manager so that this code + // path can be tested. This never happens in the current code base. + for (String poolName : removedPools) { + for (PeriodicPool periodicPool : poolQueue) { + if (periodicPool.getPoolName().compareTo(poolName) == 0) { + poolQueue.remove(periodicPool); + } + } + } + // Remove the pool names that we have in the list. + this.poolNames.removeAll(removedPools); + + for (String poolName : addedPools) { + poolQueue.add(new PeriodicPool(poolName)); + } + + // Add to the pool names we are tracking. + poolNames.addAll(addedPools); + } + + } + + /** + * Handle the case where pools are added. + * + * @param newPools - New Pools list + * @param oldPool - oldPool List. + */ + private HashSet<String> computePoolDifference(HashSet<String> newPools, + Set<String> oldPool) { + Preconditions.checkNotNull(newPools); + Preconditions.checkNotNull(oldPool); + HashSet<String> newSet = new HashSet<>(newPools); + newSet.removeAll(oldPool); + return newSet; + } + + private void initPoolProcessThread() { + + /* + * Task that runs to check if we need to start a pool processing job. + * if so we create a pool reconciliation job and find out of all the + * expected containers are on the nodes. + */ + Runnable processPools = () -> { + while (runnable.get()) { + // Make sure that we don't have any new pools. + refreshPools(); + while (inProgressPoolList.size() < inProgressPoolMaxCount) { + PeriodicPool pool = poolQueue.poll(); + if (pool != null) { + if (pool.getLastProcessedTime() + this.containerProcessingLag > + Time.monotonicNow()) { + LOG.debug("Not within the time window for processing: {}", + pool.getPoolName()); + // we might over sleep here, not a big deal. + sleepUninterruptibly(this.containerProcessingLag, + TimeUnit.MILLISECONDS); + } + LOG.debug("Adding pool {} to container processing queue", + pool.getPoolName()); + InProgressPool inProgressPool = new InProgressPool(maxPoolWait, + pool, this.nodeManager, this.poolManager, this.executorService); + inProgressPool.startReconciliation(); + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.add(inProgressPool); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } + poolProcessCount++; + } else { + break; + } + } + sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS); + inProgressPoolListLock.readLock().lock(); + try { + for (InProgressPool inProgressPool : inProgressPoolList) { + inProgressPool.finalizeReconciliation(); + poolQueue.add(inProgressPool.getPool()); + } + } finally { + inProgressPoolListLock.readLock().unlock(); + } + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.clear(); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } + } + }; + + // We will have only one thread for pool processing. + Thread poolProcessThread = new Thread(processPools); + poolProcessThread.setDaemon(true); + poolProcessThread.setName("Pool replica thread"); + poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { + // Let us just restart this thread after logging a critical error. + // if this thread is not running we cannot handle commands from SCM. + LOG.error("Critical Error : Pool replica thread encountered an " + + "error. Thread: {} Error Count : {}", t.toString(), e, + threadFaultCount.incrementAndGet()); + poolProcessThread.start(); + // TODO : Add a config to restrict how many times we will restart this + // thread in a single session. + }); + poolProcessThread.start(); + } + + /** + * Adds a container report to appropriate inProgress Pool. + * @param containerReport -- Container report for a specific container from + * a datanode. + */ + public void handleContainerReport( + ContainerReportsRequestProto containerReport) { + DatanodeID datanodeID = DatanodeID.getFromProtoBuf( + containerReport.getDatanodeID()); + inProgressPoolListLock.readLock().lock(); + try { + String poolName = poolManager.getNodePool(datanodeID); + for (InProgressPool ppool : inProgressPoolList) { + if (ppool.getPoolName().equalsIgnoreCase(poolName)) { + ppool.handleContainerReport(containerReport); + return; + } + } + // TODO: Decide if we can do anything else with this report. + LOG.debug("Discarding the container report for pool {}. " + + "That pool is not currently in the pool reconciliation process." + + " Container Name: {}", poolName, containerReport.getDatanodeID()); + } catch (SCMException e) { + LOG.warn("Skipping processing container report from datanode {}, " + + "cause: failed to get the corresponding node pool", + datanodeID.toString(), e); + } finally { + inProgressPoolListLock.readLock().unlock(); + } + } + + /** + * Get in process pool list, used for testing. + * @return List of InProgressPool + */ + @VisibleForTesting + public List<InProgressPool> getInProcessPoolList() { + return inProgressPoolList; + } + + /** + * Shutdown the Container Replication Manager. + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + setExit(); + HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java index 24423a3..833d1a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java @@ -21,9 +21,10 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.scm.node.CommandQueue; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodePoolManager; import org.apache.hadoop.util.Time; @@ -39,10 +40,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.STALE; -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNOWN; +import static com.google.common.util.concurrent.Uninterruptibles + .sleepUninterruptibly; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos + .NodeState.HEALTHY; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos + .NodeState.STALE; +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos + .NodeState.UNKNOWN; /** * These are pools that are actively checking for replication status of the @@ -51,8 +56,8 @@ import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.UNKNO public final class InProgressPool { public static final Logger LOG = LoggerFactory.getLogger(InProgressPool.class); + private final PeriodicPool pool; - private final CommandQueue commandQueue; private final NodeManager nodeManager; private final NodePoolManager poolManager; private final ExecutorService executorService; @@ -70,22 +75,19 @@ public final class InProgressPool { * @param pool - Pool that we are working against * @param nodeManager - Nodemanager * @param poolManager - pool manager - * @param commandQueue - Command queue * @param executorService - Shared Executor service. */ InProgressPool(long maxWaitTime, PeriodicPool pool, NodeManager nodeManager, NodePoolManager poolManager, - CommandQueue commandQueue, ExecutorService executorService) { + ExecutorService executorService) { Preconditions.checkNotNull(pool); Preconditions.checkNotNull(nodeManager); Preconditions.checkNotNull(poolManager); - Preconditions.checkNotNull(commandQueue); Preconditions.checkNotNull(executorService); Preconditions.checkArgument(maxWaitTime > 0); this.pool = pool; this.nodeManager = nodeManager; this.poolManager = poolManager; - this.commandQueue = commandQueue; this.executorService = executorService; this.containerCountMap = new ConcurrentHashMap<>(); this.processedNodeSet = new ConcurrentHashMap<>(); @@ -186,7 +188,7 @@ public final class InProgressPool { // Queue commands to all datanodes in this pool to send us container // report. Since we ignore dead nodes, it is possible that we would have // over replicated the container if the node comes back. - commandQueue.addCommand(id, cmd); + nodeManager.addDatanodeCommand(id, cmd); } } this.status = ProgressStatus.InProgress; @@ -235,7 +237,12 @@ public final class InProgressPool { */ public void handleContainerReport( ContainerReportsRequestProto containerReport) { - executorService.submit(processContainerReport(containerReport)); + if (status == ProgressStatus.InProgress) { + executorService.submit(processContainerReport(containerReport)); + } else { + LOG.debug("Cannot handle container report when the pool is in {} status.", + status); + } } private Runnable processContainerReport( @@ -292,6 +299,11 @@ public final class InProgressPool { return pool.getPoolName(); } + public void finalizeReconciliation() { + status = ProgressStatus.Done; + //TODO: Add finalizing logic. This is where actual reconciliation happens. + } + /** * Current status of the computing replication status. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index 266428b..ce032b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -123,6 +123,12 @@ public interface NodeManager extends StorageContainerNodeProtocol, SCMNodeMetric getNodeStat(DatanodeID datanodeID); /** + * Returns the NodePoolManager associated with the NodeManager. + * @return NodePoolManager + */ + NodePoolManager getNodePoolManager(); + + /** * Wait for the heartbeat is processed by NodeManager. * @return true if heartbeat has been processed. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index ed894cb..129e65d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -858,6 +858,11 @@ public class SCMNodeManager } @Override + public NodePoolManager getNodePoolManager() { + return nodePoolManager; + } + + @Override public Map<String, Integer> getNodeCount() { Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); for(NodeState state : NodeState.values()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java index 94f2a17..d9bf587 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java @@ -29,26 +29,32 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.ozone.scm.node.CommandQueue; import org.apache.hadoop.ozone.scm.node.NodeManager; import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; +import org.apache.hadoop.ozone.scm.node.NodePoolManager; +import org.mockito.Mockito; /** * A Node Manager to test replication. */ public class ReplicationNodeManagerMock implements NodeManager { private final Map<DatanodeID, NodeState> nodeStateMap; + private final CommandQueue commandQueue; /** * A list of Datanodes and current states. * @param nodeState A node state map. */ - public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState) { + public ReplicationNodeManagerMock(Map<DatanodeID, NodeState> nodeState, + CommandQueue commandQueue) { Preconditions.checkNotNull(nodeState); - nodeStateMap = nodeState; + this.nodeStateMap = nodeState; + this.commandQueue = commandQueue; } /** @@ -194,6 +200,11 @@ public class ReplicationNodeManagerMock implements NodeManager { return null; } + @Override + public NodePoolManager getNodePoolManager() { + return Mockito.mock(NodePoolManager.class); + } + /** * Wait for the heartbeat is processed by NodeManager. * @@ -304,4 +315,9 @@ public class ReplicationNodeManagerMock implements NodeManager { nodeStateMap.put(id, state); } + @Override + public void addDatanodeCommand(DatanodeID id, SCMCommand command) { + this.commandQueue.addCommand(id, command); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java deleted file mode 100644 index 0e36339..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.ozone.container.TestUtils - .ReplicationDatanodeStateManager; -import org.apache.hadoop.ozone.container.TestUtils.ReplicationNodeManagerMock; -import org.apache.hadoop.ozone.container.TestUtils - .ReplicationNodePoolManagerMock; -import org.apache.hadoop.ozone.container.common.SCMTestUtils; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.scm.container.replication - .ContainerReplicationManager; -import org.apache.hadoop.ozone.scm.container.replication.InProgressPool; -import org.apache.hadoop.ozone.scm.node.CommandQueue; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.node.NodePoolManager; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.GenericTestUtils.LogCapturer; -import org.slf4j.event.Level; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; -import static org.apache.ratis.shaded.com.google.common.util.concurrent - .Uninterruptibles.sleepUninterruptibly; - -/** - * Tests for the container manager. - */ -public class TestContainerReplicationManager { - final static String POOL_NAME_TEMPLATE = "Pool%d"; - static final int MAX_DATANODES = 72; - static final int POOL_SIZE = 24; - static final int POOL_COUNT = 3; - private LogCapturer logCapturer = LogCapturer.captureLogs( - LogFactory.getLog(ContainerReplicationManager.class)); - private List<DatanodeID> datanodes = new LinkedList<>(); - private NodeManager nodeManager; - private NodePoolManager poolManager; - private CommandQueue commandQueue; - private ContainerReplicationManager replicationManager; - private ReplicationDatanodeStateManager datanodeStateManager; - - @After - public void tearDown() throws Exception { - logCapturer.stopCapturing(); - GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.INFO); - } - - @Before - public void setUp() throws Exception { - GenericTestUtils.setLogLevel(ContainerReplicationManager.LOG, Level.DEBUG); - Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>(); - // We are setting up 3 pools with 24 nodes each in this cluster. - // First we create 72 Datanodes. - for (int x = 0; x < MAX_DATANODES; x++) { - DatanodeID datanode = SCMTestUtils.getDatanodeID(); - datanodes.add(datanode); - nodeStateMap.put(datanode, HEALTHY); - } - - // All nodes in this cluster are healthy for time being. - nodeManager = new ReplicationNodeManagerMock(nodeStateMap); - poolManager = new ReplicationNodePoolManagerMock(); - commandQueue = new CommandQueue(); - - Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " + - "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES); - - // Start from 1 instead of zero so we can multiply and get the node index. - for (int y = 1; y <= POOL_COUNT; y++) { - String poolName = String.format(POOL_NAME_TEMPLATE, y); - for (int z = 0; z < POOL_SIZE; z++) { - DatanodeID id = datanodes.get(y * z); - poolManager.addNode(poolName, id); - } - } - OzoneConfiguration config = SCMTestUtils.getOzoneConf(); - config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 1, - TimeUnit.SECONDS); - replicationManager = new ContainerReplicationManager(config, - nodeManager, poolManager, commandQueue); - datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager, - poolManager); - // Sleep for one second to make sure all threads get time to run. - sleepUninterruptibly(1, TimeUnit.SECONDS); - } - - @Test - /** - * Asserts that at least one pool is picked up for processing. - */ - public void testAssertPoolsAreProcessed() { - // This asserts that replication manager has started processing at least - // one pool. - Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0); - - // Since all datanodes are flagged as healthy in this test, for each - // datanode we must have queued a command. - Assert.assertEquals("Commands are in queue :", commandQueue - .getCommandsInQueue(), POOL_SIZE * replicationManager - .getInProgressPoolCount()); - } - - @Test - /** - * This test sends container reports for 2 containers to a pool in progress. - * Asserts that we are able to find a container with single replica and do - * not find container with 3 replicas. - */ - public void testDetectSingleContainerReplica() throws TimeoutException, - InterruptedException { - String singleNodeContainer = "SingleNodeContainer"; - String threeNodeContainer = "ThreeNodeContainer"; - InProgressPool ppool = replicationManager.getInProcessPoolList().get(0); - // Only single datanode reporting that "SingleNodeContainer" exists. - List<ContainerReportsRequestProto> clist = - datanodeStateManager.getContainerReport(singleNodeContainer, - ppool.getPool().getPoolName(), 1); - ppool.handleContainerReport(clist.get(0)); - - // Three nodes are going to report that ThreeNodeContainer exists. - clist = datanodeStateManager.getContainerReport(threeNodeContainer, - ppool.getPool().getPoolName(), 3); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4, - 200, 1000); - ppool.setDoneProcessing(); - - List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p - .getValue() == 1); - Assert.assertEquals(singleNodeContainer, containers.get(0).getKey()); - int count = containers.get(0).getValue(); - Assert.assertEquals(1L, count); - } - - @Test - /** - * We create three containers, Normal,OveReplicated and WayOverReplicated - * containers. This test asserts that we are able to find the - * over replicated containers. - */ - public void testDetectOverReplica() throws TimeoutException, - InterruptedException { - String normalContainer = "NormalContainer"; - String overReplicated = "OverReplicatedContainer"; - String wayOverReplicated = "WayOverReplicated"; - InProgressPool ppool = replicationManager.getInProcessPoolList().get(0); - - List<ContainerReportsRequestProto> clist = - datanodeStateManager.getContainerReport(normalContainer, - ppool.getPool().getPoolName(), 3); - ppool.handleContainerReport(clist.get(0)); - - clist = datanodeStateManager.getContainerReport(overReplicated, - ppool.getPool().getPoolName(), 4); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - - clist = datanodeStateManager.getContainerReport(wayOverReplicated, - ppool.getPool().getPoolName(), 7); - - for (ContainerReportsRequestProto reportsProto : clist) { - ppool.handleContainerReport(reportsProto); - } - - // We ignore container reports from the same datanodes. - // it is possible that these each of these containers get placed - // on same datanodes, so allowing for 4 duplicates in the set of 14. - GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10, - 200, 1000); - ppool.setDoneProcessing(); - - List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p - .getValue() > 3); - Assert.assertEquals(2, containers.size()); - } - - @Test - /** - * This test verifies that all pools are picked up for replica processing. - * - */ - public void testAllPoolsAreProcessed() throws TimeoutException, - InterruptedException { - // Verify that we saw all three pools being picked up for processing. - GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount() - >= 3, 200, 15 * 1000); - Assert.assertTrue(logCapturer.getOutput().contains("Pool1") && - logCapturer.getOutput().contains("Pool2") && - logCapturer.getOutput().contains("Pool3")); - } - - @Test - /** - * Adds a new pool and tests that we are able to pick up that new pool for - * processing as well as handle container reports for datanodes in that pool. - * @throws TimeoutException - * @throws InterruptedException - */ - public void testAddingNewPoolWorks() - throws TimeoutException, InterruptedException, IOException { - LogCapturer inProgressLog = LogCapturer.captureLogs( - LogFactory.getLog(InProgressPool.class)); - GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG); - try { - DatanodeID id = SCMTestUtils.getDatanodeID(); - ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY); - poolManager.addNode("PoolNew", id); - GenericTestUtils.waitFor(() -> - logCapturer.getOutput().contains("PoolNew"), - 200, 15 * 1000); - - // Assert that we are able to send a container report to this new - // pool and datanode. - List<ContainerReportsRequestProto> clist = - datanodeStateManager.getContainerReport("NewContainer1", - "PoolNew", 1); - replicationManager.handleContainerReport(clist.get(0)); - GenericTestUtils.waitFor(() -> - inProgressLog.getOutput().contains("NewContainer1") && inProgressLog - .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000); - } finally { - inProgressLog.stopCapturing(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java new file mode 100644 index 0000000..8927596 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerSupervisor.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.container.TestUtils + .ReplicationDatanodeStateManager; +import org.apache.hadoop.ozone.container.TestUtils.ReplicationNodeManagerMock; +import org.apache.hadoop.ozone.container.TestUtils + .ReplicationNodePoolManagerMock; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.scm.container.replication.ContainerSupervisor; +import org.apache.hadoop.ozone.scm.container.replication.InProgressPool; +import org.apache.hadoop.ozone.scm.node.CommandQueue; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.ozone.scm.node.NodePoolManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.slf4j.event.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState.HEALTHY; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; +import static org.apache.hadoop.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; +import static org.apache.ratis.shaded.com.google.common.util.concurrent + .Uninterruptibles.sleepUninterruptibly; + +/** + * Tests for the container manager. + */ +public class TestContainerSupervisor { + final static String POOL_NAME_TEMPLATE = "Pool%d"; + static final int MAX_DATANODES = 72; + static final int POOL_SIZE = 24; + static final int POOL_COUNT = 3; + private LogCapturer logCapturer = LogCapturer.captureLogs( + LogFactory.getLog(ContainerSupervisor.class)); + private List<DatanodeID> datanodes = new LinkedList<>(); + private NodeManager nodeManager; + private NodePoolManager poolManager; + private CommandQueue commandQueue; + private ContainerSupervisor containerSupervisor; + private ReplicationDatanodeStateManager datanodeStateManager; + + @After + public void tearDown() throws Exception { + logCapturer.stopCapturing(); + GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.INFO); + } + + @Before + public void setUp() throws Exception { + GenericTestUtils.setLogLevel(ContainerSupervisor.LOG, Level.DEBUG); + Map<DatanodeID, NodeState> nodeStateMap = new HashMap<>(); + // We are setting up 3 pools with 24 nodes each in this cluster. + // First we create 72 Datanodes. + for (int x = 0; x < MAX_DATANODES; x++) { + DatanodeID datanode = SCMTestUtils.getDatanodeID(); + datanodes.add(datanode); + nodeStateMap.put(datanode, HEALTHY); + } + + commandQueue = new CommandQueue(); + + // All nodes in this cluster are healthy for time being. + nodeManager = new ReplicationNodeManagerMock(nodeStateMap, commandQueue); + poolManager = new ReplicationNodePoolManagerMock(); + + + Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " + + "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES); + + // Start from 1 instead of zero so we can multiply and get the node index. + for (int y = 1; y <= POOL_COUNT; y++) { + String poolName = String.format(POOL_NAME_TEMPLATE, y); + for (int z = 0; z < POOL_SIZE; z++) { + DatanodeID id = datanodes.get(y * z); + poolManager.addNode(poolName, id); + } + } + OzoneConfiguration config = SCMTestUtils.getOzoneConf(); + config.setTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, 2, + TimeUnit.SECONDS); + config.setTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, 1, + TimeUnit.SECONDS); + containerSupervisor = new ContainerSupervisor(config, + nodeManager, poolManager); + datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager, + poolManager); + // Sleep for one second to make sure all threads get time to run. + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + @Test + /** + * Asserts that at least one pool is picked up for processing. + */ + public void testAssertPoolsAreProcessed() { + // This asserts that replication manager has started processing at least + // one pool. + Assert.assertTrue(containerSupervisor.getInProgressPoolCount() > 0); + + // Since all datanodes are flagged as healthy in this test, for each + // datanode we must have queued a command. + Assert.assertEquals("Commands are in queue :", + POOL_SIZE * containerSupervisor.getInProgressPoolCount(), + commandQueue.getCommandsInQueue()); + } + + @Test + /** + * This test sends container reports for 2 containers to a pool in progress. + * Asserts that we are able to find a container with single replica and do + * not find container with 3 replicas. + */ + public void testDetectSingleContainerReplica() throws TimeoutException, + InterruptedException { + String singleNodeContainer = "SingleNodeContainer"; + String threeNodeContainer = "ThreeNodeContainer"; + InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); + // Only single datanode reporting that "SingleNodeContainer" exists. + List<ContainerReportsRequestProto> clist = + datanodeStateManager.getContainerReport(singleNodeContainer, + ppool.getPool().getPoolName(), 1); + ppool.handleContainerReport(clist.get(0)); + + // Three nodes are going to report that ThreeNodeContainer exists. + clist = datanodeStateManager.getContainerReport(threeNodeContainer, + ppool.getPool().getPoolName(), 3); + + for (ContainerReportsRequestProto reportsProto : clist) { + ppool.handleContainerReport(reportsProto); + } + GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4, + 200, 1000); + ppool.setDoneProcessing(); + + List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p + .getValue() == 1); + Assert.assertEquals(singleNodeContainer, containers.get(0).getKey()); + int count = containers.get(0).getValue(); + Assert.assertEquals(1L, count); + } + + @Test + /** + * We create three containers, Normal,OveReplicated and WayOverReplicated + * containers. This test asserts that we are able to find the + * over replicated containers. + */ + public void testDetectOverReplica() throws TimeoutException, + InterruptedException { + String normalContainer = "NormalContainer"; + String overReplicated = "OverReplicatedContainer"; + String wayOverReplicated = "WayOverReplicated"; + InProgressPool ppool = containerSupervisor.getInProcessPoolList().get(0); + + List<ContainerReportsRequestProto> clist = + datanodeStateManager.getContainerReport(normalContainer, + ppool.getPool().getPoolName(), 3); + ppool.handleContainerReport(clist.get(0)); + + clist = datanodeStateManager.getContainerReport(overReplicated, + ppool.getPool().getPoolName(), 4); + + for (ContainerReportsRequestProto reportsProto : clist) { + ppool.handleContainerReport(reportsProto); + } + + clist = datanodeStateManager.getContainerReport(wayOverReplicated, + ppool.getPool().getPoolName(), 7); + + for (ContainerReportsRequestProto reportsProto : clist) { + ppool.handleContainerReport(reportsProto); + } + + // We ignore container reports from the same datanodes. + // it is possible that these each of these containers get placed + // on same datanodes, so allowing for 4 duplicates in the set of 14. + GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10, + 200, 1000); + ppool.setDoneProcessing(); + + List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p + .getValue() > 3); + Assert.assertEquals(2, containers.size()); + } + + @Test + /** + * This test verifies that all pools are picked up for replica processing. + * + */ + public void testAllPoolsAreProcessed() throws TimeoutException, + InterruptedException { + // Verify that we saw all three pools being picked up for processing. + GenericTestUtils.waitFor(() -> containerSupervisor.getPoolProcessCount() + >= 3, 200, 15 * 1000); + Assert.assertTrue(logCapturer.getOutput().contains("Pool1") && + logCapturer.getOutput().contains("Pool2") && + logCapturer.getOutput().contains("Pool3")); + } + + @Test + /** + * Adds a new pool and tests that we are able to pick up that new pool for + * processing as well as handle container reports for datanodes in that pool. + * @throws TimeoutException + * @throws InterruptedException + */ + public void testAddingNewPoolWorks() + throws TimeoutException, InterruptedException, IOException { + LogCapturer inProgressLog = LogCapturer.captureLogs( + LogFactory.getLog(InProgressPool.class)); + GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.DEBUG); + try { + DatanodeID id = SCMTestUtils.getDatanodeID(); + ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, HEALTHY); + poolManager.addNode("PoolNew", id); + GenericTestUtils.waitFor(() -> + logCapturer.getOutput().contains("PoolNew"), + 200, 15 * 1000); + + // Assert that we are able to send a container report to this new + // pool and datanode. + List<ContainerReportsRequestProto> clist = + datanodeStateManager.getContainerReport("NewContainer1", + "PoolNew", 1); + containerSupervisor.handleContainerReport(clist.get(0)); + GenericTestUtils.waitFor(() -> + inProgressLog.getOutput().contains("NewContainer1") && inProgressLog + .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000); + } finally { + inProgressLog.stopCapturing(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index 270929e..1314126 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.ozone.scm.node.NodePoolManager; +import org.mockito.Mockito; import java.io.IOException; import java.util.HashMap; @@ -269,6 +271,11 @@ public class MockNodeManager implements NodeManager { return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString())); } + @Override + public NodePoolManager getNodePoolManager() { + return Mockito.mock(NodePoolManager.class); + } + /** * Used for testing. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/74484754/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java index ac8dee9..56085e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java @@ -203,8 +203,7 @@ public class TestContainerMapping { } @Test - public void testFullContainerReport() throws IOException, - InterruptedException { + public void testFullContainerReport() throws IOException { String containerName = UUID.randomUUID().toString(); ContainerInfo info = createContainer(containerName); DatanodeID datanodeID = SCMTestUtils.getDatanodeID(); @@ -227,7 +226,13 @@ public class TestContainerMapping { .setContainerID(info.getContainerID()); reports.add(ciBuilder.build()); - mapping.processContainerReports(datanodeID, reportType, reports); + + ContainerReportsRequestProto.Builder crBuilder = + ContainerReportsRequestProto.newBuilder(); + crBuilder.setDatanodeID(datanodeID.getProtoBufMessage()) + .setType(reportType).addAllReports(reports); + + mapping.processContainerReports(crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(containerName); Assert.assertEquals(100000000L, updatedContainer.getNumberOfKeys()); @@ -260,7 +265,12 @@ public class TestContainerMapping { reports.add(ciBuilder.build()); - mapping.processContainerReports(datanodeID, reportType, reports); + ContainerReportsRequestProto.Builder crBuilder = + ContainerReportsRequestProto.newBuilder(); + crBuilder.setDatanodeID(datanodeID.getProtoBufMessage()) + .setType(reportType).addAllReports(reports); + + mapping.processContainerReports(crBuilder.build()); ContainerInfo updatedContainer = mapping.getContainer(containerName); Assert.assertEquals(500000000L, updatedContainer.getNumberOfKeys()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org