This is an automated email from the ASF dual-hosted git repository. nanda pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 58394ebdd22db72b8f51a7ed700ad0c54eff4e3d Author: Li Cheng <[email protected]> AuthorDate: Sat Oct 24 20:55:36 2020 +0530 HDDS-3837. Add isLeader check in SCMHAManager. --- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 5 +- .../scm/container/CloseContainerEventHandler.java | 4 +- .../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 13 ++ .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 85 ++++++++++- .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 6 +- .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java | 10 ++ .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 15 ++ .../hadoop/hdds/scm/node/NewNodeHandler.java | 12 +- .../scm/node/NonHealthyToHealthyNodeHandler.java | 12 +- .../scm/pipeline/BackgroundPipelineCreator.java | 2 +- .../hadoop/hdds/scm/pipeline/PipelineManager.java | 5 +- .../hdds/scm/pipeline/PipelineManagerMXBean.java | 3 +- .../hdds/scm/pipeline/PipelineManagerV2Impl.java | 65 +++++++-- .../hadoop/hdds/scm/ha/MockSCMHAManager.java | 53 ++++++- .../scm/pipeline/TestPipelineActionHandler.java | 3 +- .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 156 ++++++++++++++++++--- .../hdds/scm/safemode/TestSCMSafeModeManager.java | 2 +- 17 files changed, 401 insertions(+), 50 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index b5b2aaf..ec0094b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -57,6 +57,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; + +import org.apache.ratis.protocol.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -256,7 +258,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { * @param containerInfo - Container Info. * @return AllocatedBlock */ - private AllocatedBlock newBlock(ContainerInfo containerInfo) { + private AllocatedBlock newBlock(ContainerInfo containerInfo) + throws NotLeaderException { try { final Pipeline pipeline = pipelineManager .getPipeline(containerInfo.getPipelineID()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index fd73711..a2b79fb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.ratis.protocol.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +99,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { * @throws ContainerNotFoundException */ private List<DatanodeDetails> getNodes(final ContainerInfo container) - throws ContainerNotFoundException { + throws ContainerNotFoundException, NotLeaderException { try { return pipelineManager.getPipeline(container.getPipelineID()).getNodes(); } catch (PipelineNotFoundException ex) { @@ -109,5 +110,4 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { .collect(Collectors.toList()); } } - } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java index eb6c800..ade0ad9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java @@ -17,6 +17,9 @@ package org.apache.hadoop.hdds.scm.ha; +import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.RaftPeer; + import java.io.IOException; /** @@ -40,7 +43,17 @@ public interface SCMHAManager { SCMRatisServer getRatisServer(); /** + * Returns suggested leader from RaftServer. + */ + RaftPeer getSuggestedLeader(); + + /** * Stops the HA service. */ void shutdown() throws IOException; + + /** + * Returns NotLeaderException with useful info. + */ + NotLeaderException triggerNotLeaderException(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index 89ac714..8bb9457 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -17,7 +17,17 @@ package org.apache.hadoop.hdds.scm.ha; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -31,14 +41,17 @@ import java.io.IOException; */ public class SCMHAManagerImpl implements SCMHAManager { - private static boolean isLeader = true; + private static final Logger LOG = + LoggerFactory.getLogger(SCMHAManagerImpl.class); private final SCMRatisServerImpl ratisServer; + private final ConfigurationSource conf; /** * Creates SCMHAManager instance. */ public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException { + this.conf = conf; this.ratisServer = new SCMRatisServerImpl( conf.getObject(SCMHAConfiguration.class), conf); } @@ -56,7 +69,28 @@ public class SCMHAManagerImpl implements SCMHAManager { */ @Override public boolean isLeader() { - return isLeader; + if (!SCMHAUtils.isSCMHAEnabled(conf)) { + // When SCM HA is not enabled, the current SCM is always the leader. + return true; + } + RaftServer server = ratisServer.getServer(); + Preconditions.checkState(server instanceof RaftServerProxy); + RaftServerImpl serverImpl = null; + try { + // SCM only has one raft group. + serverImpl = ((RaftServerProxy) server) + .getImpl(ratisServer.getRaftGroupId()); + if (serverImpl != null) { + // Only when it's sure the current SCM is the leader, otherwise + // it should all return false. + return serverImpl.isLeader(); + } + } catch (IOException ioe) { + LOG.error("Fail to get RaftServer impl and therefore it's not clear " + + "whether it's leader. ", ioe); + } + + return false; } /** @@ -67,6 +101,42 @@ public class SCMHAManagerImpl implements SCMHAManager { return ratisServer; } + private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) { + if (serverImpl.isLeader()) { + return RaftPeerId.getRaftPeerId( + serverImpl.getRoleInfoProto().getLeaderInfo().toString()); + } else if (serverImpl.isFollower()) { + return RaftPeerId.valueOf( + serverImpl.getRoleInfoProto().getFollowerInfo() + .getLeaderInfo().getId().getId()); + } else { + return null; + } + } + + @Override + public RaftPeer getSuggestedLeader() { + RaftServer server = ratisServer.getServer(); + Preconditions.checkState(server instanceof RaftServerProxy); + RaftServerImpl serverImpl = null; + try { + // SCM only has one raft group. + serverImpl = ((RaftServerProxy) server) + .getImpl(ratisServer.getRaftGroupId()); + if (serverImpl != null) { + RaftPeerId peerId = getPeerIdFromRoleInfo(serverImpl); + if (peerId != null) { + return new RaftPeer(peerId); + } + return null; + } + } catch (IOException ioe) { + LOG.error("Fail to get RaftServer impl and therefore it's not clear " + + "whether it's leader. ", ioe); + } + return null; + } + /** * {@inheritDoc} */ @@ -75,4 +145,15 @@ public class SCMHAManagerImpl implements SCMHAManager { ratisServer.stop(); } + /** + * {@inheritDoc} + */ + @Override + public NotLeaderException triggerNotLeaderException() { + return new NotLeaderException(RaftGroupMemberId.valueOf( + ratisServer.getServer().getId(), + ratisServer.getRaftGroupId()), + getSuggestedLeader(), + ratisServer.getRaftPeers()); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java index eb22566..0f71744 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.ha; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer; @@ -37,12 +37,12 @@ public final class SCMHAUtils { } // Check if SCM HA is enabled. - public static boolean isSCMHAEnabled(OzoneConfiguration conf) { + public static boolean isSCMHAEnabled(ConfigurationSource conf) { return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT); } - public static File createSCMRatisDir(OzoneConfiguration conf) + public static File createSCMRatisDir(ConfigurationSource conf) throws IllegalArgumentException { String scmRatisDir = SCMRatisServer.getSCMRatisDirectory(conf); if (scmRatisDir == null || scmRatisDir.isEmpty()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java index 4ddbc7b..2f99776 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java @@ -18,8 +18,12 @@ package org.apache.hadoop.hdds.scm.ha; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServer; import java.io.IOException; +import java.util.List; import java.util.concurrent.ExecutionException; /** @@ -35,4 +39,10 @@ public interface SCMRatisServer { throws IOException, ExecutionException, InterruptedException; void stop() throws IOException; + + RaftServer getServer(); + + RaftGroupId getRaftGroupId(); + + List<RaftPeer> getRaftPeers(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java index 45ae212..33ae109 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -110,4 +111,18 @@ public class SCMRatisServerImpl implements SCMRatisServer { server.close(); } + @Override + public RaftServer getServer() { + return server; + } + + @Override + public RaftGroupId getRaftGroupId() { + return raftGroupId; + } + + @Override + public List<RaftPeer> getRaftPeers() { + return Collections.singletonList(new RaftPeer(raftPeerId)); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java index a40a63a..42cada9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java @@ -23,11 +23,16 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.ratis.protocol.NotLeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handles New Node event. */ public class NewNodeHandler implements EventHandler<DatanodeDetails> { + private static final Logger LOG = + LoggerFactory.getLogger(NewNodeHandler.class); private final PipelineManager pipelineManager; private final ConfigurationSource conf; @@ -41,6 +46,11 @@ public class NewNodeHandler implements EventHandler<DatanodeDetails> { @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - pipelineManager.triggerPipelineCreation(); + try { + pipelineManager.triggerPipelineCreation(); + } catch (NotLeaderException ex) { + LOG.debug("Not the current leader SCM and cannot start pipeline" + + " creation."); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java index cc32f84..e73231b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java @@ -23,12 +23,17 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.ratis.protocol.NotLeaderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handles Stale node event. */ public class NonHealthyToHealthyNodeHandler implements EventHandler<DatanodeDetails> { + private static final Logger LOG = + LoggerFactory.getLogger(NonHealthyToHealthyNodeHandler.class); private final PipelineManager pipelineManager; private final ConfigurationSource conf; @@ -42,6 +47,11 @@ public class NonHealthyToHealthyNodeHandler @Override public void onMessage(DatanodeDetails datanodeDetails, EventPublisher publisher) { - pipelineManager.triggerPipelineCreation(); + try { + pipelineManager.triggerPipelineCreation(); + } catch (NotLeaderException ex) { + LOG.debug("Not the current leader SCM and cannot start pipeline" + + " creation."); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java index f240293..42b3a93 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java @@ -103,7 +103,7 @@ class BackgroundPipelineCreator { } } - private void createPipelines() { + private void createPipelines() throws RuntimeException { // TODO: #CLUTIL Different replication factor may need to be supported HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf( conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 9c997a8..ddd461b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.ratis.protocol.NotLeaderException; /** * Interface which exposes the api for pipeline management. @@ -55,7 +56,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean, ReplicationFactor factor); List<Pipeline> getPipelines(ReplicationType type, - Pipeline.PipelineState state); + Pipeline.PipelineState state) throws NotLeaderException; List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor, Pipeline.PipelineState state); @@ -84,7 +85,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean, void startPipelineCreator(); - void triggerPipelineCreation(); + void triggerPipelineCreation() throws NotLeaderException; void incNumBlocksAllocatedMetric(PipelineID id); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java index 6d7d717..55e096b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.ratis.protocol.NotLeaderException; import java.util.Map; @@ -33,6 +34,6 @@ public interface PipelineManagerMXBean { * Returns the number of pipelines in different state. * @return state to number of pipeline map */ - Map<String, Integer> getPipelineInfo(); + Map<String, Integer> getPipelineInfo() throws NotLeaderException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java index 1241745..069540c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.utils.Scheduler; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.Time; +import org.apache.ratis.protocol.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,19 +76,21 @@ public final class PipelineManagerV2Impl implements PipelineManager { private final SCMPipelineMetrics metrics; private long pipelineWaitDefaultTimeout; private final AtomicBoolean isInSafeMode; + private SCMHAManager scmhaManager; // Used to track if the safemode pre-checks have completed. This is designed // to prevent pipelines being created until sufficient nodes have registered. private final AtomicBoolean pipelineCreationAllowed; private PipelineManagerV2Impl(ConfigurationSource conf, - NodeManager nodeManager, - StateManager pipelineStateManager, - PipelineFactory pipelineFactory, + SCMHAManager scmhaManager, + StateManager pipelineStateManager, + PipelineFactory pipelineFactory, EventPublisher eventPublisher) { this.lock = new ReentrantReadWriteLock(); this.pipelineFactory = pipelineFactory; this.stateManager = pipelineStateManager; this.conf = conf; + this.scmhaManager = scmhaManager; this.eventPublisher = eventPublisher; this.pmInfoBean = MBeans.register("SCMPipelineManager", "SCMPipelineManagerInfo", this); @@ -120,7 +123,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { nodeManager, stateManager, conf, eventPublisher); // Create PipelineManager PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf, - nodeManager, stateManager, pipelineFactory, eventPublisher); + scmhaManager, stateManager, pipelineFactory, eventPublisher); // Create background thread. Scheduler scheduler = new Scheduler( @@ -136,6 +139,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor) throws IOException { + checkLeader(); if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) { LOG.debug("Pipeline creation is not allowed until safe mode prechecks " + "complete"); @@ -266,6 +270,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public void addContainerToPipeline( PipelineID pipelineID, ContainerID containerID) throws IOException { + checkLeader(); lock.writeLock().lock(); try { stateManager.addContainerToPipeline(pipelineID, containerID); @@ -277,6 +282,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public void removeContainerFromPipeline( PipelineID pipelineID, ContainerID containerID) throws IOException { + checkLeader(); lock.writeLock().lock(); try { stateManager.removeContainerFromPipeline(pipelineID, containerID); @@ -288,6 +294,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public NavigableSet<ContainerID> getContainersInPipeline( PipelineID pipelineID) throws IOException { + checkLeader(); lock.readLock().lock(); try { return stateManager.getContainers(pipelineID); @@ -298,11 +305,13 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public int getNumberOfContainers(PipelineID pipelineID) throws IOException { + checkLeader(); return stateManager.getNumberOfContainers(pipelineID); } @Override public void openPipeline(PipelineID pipelineId) throws IOException { + checkLeader(); lock.writeLock().lock(); try { Pipeline pipeline = stateManager.getPipeline(pipelineId); @@ -328,6 +337,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { * @throws IOException */ protected void removePipeline(Pipeline pipeline) throws IOException { + checkLeader(); pipelineFactory.close(pipeline.getType(), pipeline); PipelineID pipelineID = pipeline.getId(); lock.writeLock().lock(); @@ -349,6 +359,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { */ protected void closeContainersForPipeline(final PipelineID pipelineId) throws IOException { + checkLeader(); Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId); for (ContainerID containerID : containerIDs) { eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); @@ -364,6 +375,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException { + checkLeader(); PipelineID pipelineID = pipeline.getId(); lock.writeLock().lock(); try { @@ -393,6 +405,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public void scrubPipeline(ReplicationType type, ReplicationFactor factor) throws IOException { + checkLeader(); if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) { // Only srub pipeline for RATIS THREE pipeline return; @@ -439,7 +452,9 @@ public final class PipelineManagerV2Impl implements PipelineManager { * Triggers pipeline creation after the specified time. */ @Override - public void triggerPipelineCreation() { + public void triggerPipelineCreation() throws NotLeaderException { + // TODO add checkLeader once follower validates safemode + // before it becomes leader. backgroundPipelineCreator.triggerPipelineCreation(); } @@ -457,6 +472,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public void activatePipeline(PipelineID pipelineID) throws IOException { + checkLeader(); stateManager.updatePipelineState(pipelineID.getProtobuf(), HddsProtos.PipelineState.PIPELINE_OPEN); } @@ -470,6 +486,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public void deactivatePipeline(PipelineID pipelineID) throws IOException { + checkLeader(); stateManager.updatePipelineState(pipelineID.getProtobuf(), HddsProtos.PipelineState.PIPELINE_DORMANT); } @@ -484,6 +501,7 @@ public final class PipelineManagerV2Impl implements PipelineManager { @Override public void waitPipelineReady(PipelineID pipelineID, long timeout) throws IOException { + checkLeader(); long st = Time.monotonicNow(); if (timeout == 0) { timeout = pipelineWaitDefaultTimeout; @@ -515,7 +533,8 @@ public final class PipelineManagerV2Impl implements PipelineManager { } @Override - public Map<String, Integer> getPipelineInfo() { + public Map<String, Integer> getPipelineInfo() throws NotLeaderException { + checkLeader(); final Map<String, Integer> pipelineInfo = new HashMap<>(); for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { pipelineInfo.put(state.toString(), 0); @@ -564,13 +583,21 @@ public final class PipelineManagerV2Impl implements PipelineManager { // Trigger pipeline creation only if the preCheck status has changed to // complete. - if (isPipelineCreationAllowed() && !currentAllowPipelines) { - triggerPipelineCreation(); - } - // Start the pipeline creation thread only when safemode switches off - if (!getSafeModeStatus() && currentlyInSafeMode) { - startPipelineCreator(); + + try { + if (isPipelineCreationAllowed() && !currentAllowPipelines) { + triggerPipelineCreation(); + } + // Start the pipeline creation thread only when safemode switches off + if (!getSafeModeStatus() && currentlyInSafeMode) { + startPipelineCreator(); + } + } catch (NotLeaderException ex) { + LOG.warn("Not the current leader SCM and cannot process pipeline" + + " creation. Suggested leader is: ", + scmhaManager.getSuggestedLeader().getAddress()); } + } @VisibleForTesting @@ -593,6 +620,20 @@ public final class PipelineManagerV2Impl implements PipelineManager { public StateManager getStateManager() { return stateManager; } + + public void setScmhaManager(SCMHAManager scmhaManager) { + this.scmhaManager = scmhaManager; + } + + /** + * Check if scm is current leader. + * @throws NotLeaderException when it's not the current leader. + */ + private void checkLeader() throws NotLeaderException { + if (!scmhaManager.isLeader()) { + throw scmhaManager.triggerNotLeaderException(); + } + } private void setBackgroundPipelineCreator( BackgroundPipelineCreator backgroundPipelineCreator) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java index c3b14fb..ce48c11 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java @@ -28,11 +28,14 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.NotLeaderException; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; +import org.apache.ratis.server.RaftServer; /** * Mock SCMHAManager implementation for testing. @@ -40,16 +43,30 @@ import org.apache.ratis.protocol.StateMachineException; public final class MockSCMHAManager implements SCMHAManager { private final SCMRatisServer ratisServer; + private boolean isLeader; public static SCMHAManager getInstance() { return new MockSCMHAManager(); } + public static SCMHAManager getLeaderInstance() { + MockSCMHAManager mockSCMHAManager = new MockSCMHAManager(); + mockSCMHAManager.setIsLeader(true); + return mockSCMHAManager; + } + + public static SCMHAManager getFollowerInstance() { + MockSCMHAManager mockSCMHAManager = new MockSCMHAManager(); + mockSCMHAManager.setIsLeader(false); + return mockSCMHAManager; + } + /** * Creates MockSCMHAManager instance. */ private MockSCMHAManager() { this.ratisServer = new MockRatisServer(); + this.isLeader = true; } @Override @@ -62,7 +79,16 @@ public final class MockSCMHAManager implements SCMHAManager { */ @Override public boolean isLeader() { - return true; + return isLeader; + } + + public void setIsLeader(boolean isLeader) { + this.isLeader = isLeader; + } + + @Override + public RaftPeer getSuggestedLeader() { + throw new UnsupportedOperationException(); } /** @@ -81,6 +107,16 @@ public final class MockSCMHAManager implements SCMHAManager { ratisServer.stop(); } + /** + * {@inheritDoc} + */ + @Override + public NotLeaderException triggerNotLeaderException() { + return new NotLeaderException(RaftGroupMemberId.valueOf( + RaftPeerId.valueOf("peer"), RaftGroupId.randomId()), + null, new ArrayList<>()); + } + private static class MockRatisServer implements SCMRatisServer { private Map<RequestType, Object> handlers = @@ -141,6 +177,21 @@ public final class MockSCMHAManager implements SCMHAManager { } @Override + public RaftServer getServer() { + return null; + } + + @Override + public RaftGroupId getRaftGroupId() { + return null; + } + + @Override + public List<RaftPeer> getRaftPeers() { + return new ArrayList<>(); + } + + @Override public void stop() { } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java index 99443c3..e40c8ba 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.ratis.protocol.NotLeaderException; import org.junit.Test; import org.mockito.Mockito; @@ -37,7 +38,7 @@ public class TestPipelineActionHandler { @Test public void testCloseActionForMissingPipeline() - throws PipelineNotFoundException { + throws PipelineNotFoundException, NotLeaderException { final PipelineManager manager = Mockito.mock(PipelineManager.class); final EventQueue queue = Mockito.mock(EventQueue.class); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index e1f9104..a8f03bb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; @@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.NotLeaderException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -66,7 +68,6 @@ public class TestPipelineManagerImpl { private DBStore dbStore; private static MockNodeManager nodeManager; private static int maxPipelineCount; - private static EventQueue eventQueue; @Before public void init() throws Exception { @@ -76,7 +77,6 @@ public class TestPipelineManagerImpl { conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition()); nodeManager = new MockNodeManager(true, 20); - eventQueue = new EventQueue(); maxPipelineCount = nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) * conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT, OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) / @@ -91,17 +91,23 @@ public class TestPipelineManagerImpl { FileUtil.fullyDelete(testDir); } - private PipelineManagerV2Impl createPipelineManager() + private PipelineManagerV2Impl createPipelineManager(boolean leader) throws IOException { - return PipelineManagerV2Impl.newPipelineManager( - conf, MockSCMHAManager.getInstance(), - nodeManager, - SCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue); + SCMHAManager scmhaManager; + if (leader) { + scmhaManager = MockSCMHAManager.getLeaderInstance(); + } else { + scmhaManager = MockSCMHAManager.getFollowerInstance(); + } + return PipelineManagerV2Impl.newPipelineManager(conf, scmhaManager, + new MockNodeManager(true, 20), + SCMDBDefinition.PIPELINES.getTable(dbStore), + new EventQueue()); } @Test public void testCreatePipeline() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); pipelineManager.allowPipelineCreation(); Pipeline pipeline1 = pipelineManager.createPipeline( @@ -115,7 +121,7 @@ public class TestPipelineManagerImpl { Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId())); pipelineManager.close(); - PipelineManagerV2Impl pipelineManager2 = createPipelineManager(); + PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true); // Should be able to load previous pipelines. Assert.assertFalse(pipelineManager.getPipelines().isEmpty()); Assert.assertEquals(2, pipelineManager.getPipelines().size()); @@ -129,8 +135,24 @@ public class TestPipelineManagerImpl { } @Test + public void testCreatePipelineShouldFailOnFollower() throws Exception { + PipelineManagerV2Impl pipelineManager = createPipelineManager(false); + Assert.assertTrue(pipelineManager.getPipelines().isEmpty()); + pipelineManager.allowPipelineCreation(); + try { + pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + } catch (NotLeaderException ex) { + pipelineManager.close(); + return; + } + // Should not reach here. + Assert.fail(); + } + + @Test public void testUpdatePipelineStates() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager.createPipeline( HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -164,8 +186,71 @@ public class TestPipelineManagerImpl { } @Test + public void testOpenPipelineShouldFailOnFollower() throws Exception { + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + pipelineManager.allowPipelineCreation(); + Pipeline pipeline = pipelineManager.createPipeline( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + Assert.assertEquals(1, pipelineManager.getPipelines().size()); + Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); + Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); + // Change to follower + pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance()); + try { + pipelineManager.openPipeline(pipeline.getId()); + } catch (NotLeaderException ex) { + pipelineManager.close(); + return; + } + // Should not reach here. + Assert.fail(); + } + + @Test + public void testActivatePipelineShouldFailOnFollower() throws Exception { + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + pipelineManager.allowPipelineCreation(); + Pipeline pipeline = pipelineManager.createPipeline( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + Assert.assertEquals(1, pipelineManager.getPipelines().size()); + Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); + Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); + // Change to follower + pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance()); + try { + pipelineManager.activatePipeline(pipeline.getId()); + } catch (NotLeaderException ex) { + pipelineManager.close(); + return; + } + // Should not reach here. + Assert.fail(); + } + + @Test + public void testDeactivatePipelineShouldFailOnFollower() throws Exception { + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + pipelineManager.allowPipelineCreation(); + Pipeline pipeline = pipelineManager.createPipeline( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + Assert.assertEquals(1, pipelineManager.getPipelines().size()); + Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); + Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); + // Change to follower + pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance()); + try { + pipelineManager.deactivatePipeline(pipeline.getId()); + } catch (NotLeaderException ex) { + pipelineManager.close(); + return; + } + // Should not reach here. + Assert.fail(); + } + + @Test public void testRemovePipeline() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); pipelineManager.allowPipelineCreation(); // Create a pipeline Pipeline pipeline = pipelineManager.createPipeline( @@ -207,12 +292,33 @@ public class TestPipelineManagerImpl { } @Test + public void testClosePipelineShouldFailOnFollower() throws Exception { + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); + pipelineManager.allowPipelineCreation(); + Pipeline pipeline = pipelineManager.createPipeline( + HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); + Assert.assertEquals(1, pipelineManager.getPipelines().size()); + Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId())); + Assert.assertEquals(ALLOCATED, pipeline.getPipelineState()); + // Change to follower + pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance()); + try { + pipelineManager.closePipeline(pipeline, false); + } catch (NotLeaderException ex) { + pipelineManager.close(); + return; + } + // Should not reach here. + Assert.fail(); + } + + @Test public void testPipelineReport() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); pipelineManager.allowPipelineCreation(); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager, - eventQueue); + new EventQueue()); Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -258,7 +364,7 @@ public class TestPipelineManagerImpl { @Test public void testPipelineCreationFailedMetric() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); pipelineManager.allowPipelineCreation(); // No pipeline at start @@ -313,7 +419,7 @@ public class TestPipelineManagerImpl { @Test public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); pipelineManager.allowPipelineCreation(); pipelineManager.onMessage( @@ -324,13 +430,13 @@ public class TestPipelineManagerImpl { // close manager pipelineManager.close(); // new pipeline manager loads the pipelines from the db in ALLOCATED state - pipelineManager = createPipelineManager(); + pipelineManager = createPipelineManager(true); Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(new OzoneConfiguration(), - new ArrayList<>(), pipelineManager, eventQueue); + new ArrayList<>(), pipelineManager, new EventQueue()); PipelineReportHandler pipelineReportHandler = new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); @@ -362,7 +468,7 @@ public class TestPipelineManagerImpl { OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, TimeUnit.MILLISECONDS); - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); pipelineManager.allowPipelineCreation(); Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, @@ -388,6 +494,14 @@ public class TestPipelineManagerImpl { pipelineManager.close(); } + @Test (expected = NotLeaderException.class) + public void testScrubPipelineShouldFailOnFollower() throws Exception { + PipelineManagerV2Impl pipelineManager = createPipelineManager(false); + pipelineManager.allowPipelineCreation(); + pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + } + @Test public void testPipelineNotCreatedUntilSafeModePrecheck() throws Exception { // No timeout for pipeline scrubber. @@ -395,7 +509,7 @@ public class TestPipelineManagerImpl { OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, TimeUnit.MILLISECONDS); - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); try { pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); @@ -433,7 +547,7 @@ public class TestPipelineManagerImpl { OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1, TimeUnit.MILLISECONDS); - PipelineManagerV2Impl pipelineManager = createPipelineManager(); + PipelineManagerV2Impl pipelineManager = createPipelineManager(true); Assert.assertTrue(pipelineManager.getSafeModeStatus()); Assert.assertFalse(pipelineManager.isPipelineCreationAllowed()); // First pass pre-check as true, but safemode still on @@ -456,6 +570,6 @@ public class TestPipelineManagerImpl { boolean isLeader) { SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode report = TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(), isLeader); - pipelineReportHandler.onMessage(report, eventQueue); + pipelineReportHandler.onMessage(report, new EventQueue()); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index 935dc77..0febf06 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -485,7 +485,7 @@ public class TestSCMSafeModeManager { @Test - public void testDisableSafeMode() { + public void testDisableSafeMode() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(config); conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, false); PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
