This is an automated email from the ASF dual-hosted git repository.
licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 4e1c93f HDDS-3192. Handle AllocateContainer operation for HA. (#975)
4e1c93f is described below
commit 4e1c93f8f0613a00bd6ad2a74453bcf6246a065f
Author: Nandakumar <[email protected]>
AuthorDate: Thu May 28 14:38:58 2020 +0530
HDDS-3192. Handle AllocateContainer operation for HA. (#975)
---
.../hdds/scm/container/ContainerManagerImpl.java | 19 +-
.../hdds/scm/container/ContainerManagerV2.java | 5 +-
.../scm/container/ContainerStateManagerImpl.java | 140 ++++++-------
.../scm/container/ContainerStateManagerV2.java | 76 ++++++-
.../scm/container/states/ContainerStateMap.java | 1 +
.../hadoop/hdds/scm/ha/SCMHAInvocationHandler.java | 6 +-
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 40 +---
.../{SCMHAManager.java => SCMHAManagerImpl.java} | 26 +--
.../hadoop/hdds/scm/ha/SCMRatisResponse.java | 8 +
.../apache/hadoop/hdds/scm/ha/SCMRatisServer.java | 85 +-------
...SCMRatisServer.java => SCMRatisServerImpl.java} | 16 +-
.../scm/container/TestContainerManagerImpl.java | 91 ++++++++
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 148 +++++++++++++
.../hdds/scm/pipeline/MockPipelineManager.java | 228 +++++++++++++++++++++
14 files changed, 656 insertions(+), 233 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 0404530..36b9a30 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -47,6 +47,10 @@ import org.slf4j.LoggerFactory;
*/
public class ContainerManagerImpl implements ContainerManagerV2 {
+ /*
+ * TODO: Introduce container level locks.
+ */
+
/**
*
*/
@@ -72,17 +76,18 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
*
*/
public ContainerManagerImpl(
- // Introduce builder for this class?
- final Configuration conf, final PipelineManager pipelineManager,
- final SCMHAManager scmhaManager,
+ final Configuration conf,
+ final SCMHAManager scmHaManager,
+ final PipelineManager pipelineManager,
final Table<ContainerID, ContainerInfo> containerStore)
throws IOException {
+ // Introduce builder for this class?
this.lock = new ReentrantReadWriteLock();
this.pipelineManager = pipelineManager;
- this.containerStateManager = ContainerStateManagerImpl.newBuilder()
+ this.containerStateManager = ContainerStateManagerImpl.newBuilder()
.setConfiguration(conf)
.setPipelineManager(pipelineManager)
- .setRatisServer(scmhaManager.getRatisServer())
+ .setRatisServer(scmHaManager.getRatisServer())
.setContainerStore(containerStore)
.build();
}
@@ -275,8 +280,8 @@ public class ContainerManagerImpl implements
ContainerManagerV2 {
}
@Override
- public void close() throws IOException {
- throw new UnsupportedOperationException("Not yet implemented!");
+ public void close() throws Exception {
+ containerStateManager.close();
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
index 37c7b70..863ca4d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
@@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.scm.container;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -36,8 +35,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
* mapping. This is used by SCM when allocating new locations and when
* looking up a key.
*/
-public interface ContainerManagerV2 extends Closeable {
-
+public interface ContainerManagerV2 extends AutoCloseable {
+ // TODO: Rename this to ContainerManager
/**
* Returns all the container Ids managed by ContainerManager.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 16fe340..4f4456a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
-
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Map;
@@ -27,128 +26,92 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
-import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
-import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
-import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * TODO: Add javadoc.
+ * Default implementation of ContainerStateManager. This implementation
+ * holds the Container States in-memory which is backed by a persistent store.
+ * The persistent store is always kept in sync with the in-memory state
changes.
*/
public final class ContainerStateManagerImpl
implements ContainerStateManagerV2 {
- /* **********************************************************************
- * Container Life Cycle *
- * *
- * Event and State Transition Mapping: *
- * *
- * State: OPEN ----------------> CLOSING *
- * Event: FINALIZE *
- * *
- * State: CLOSING ----------------> QUASI_CLOSED *
- * Event: QUASI_CLOSE *
- * *
- * State: CLOSING ----------------> CLOSED *
- * Event: CLOSE *
- * *
- * State: QUASI_CLOSED ----------------> CLOSED *
- * Event: FORCE_CLOSE *
- * *
- * State: CLOSED ----------------> DELETING *
- * Event: DELETE *
- * *
- * State: DELETING ----------------> DELETED *
- * Event: CLEANUP *
- * *
- * *
- * Container State Flow: *
- * *
- * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] *
- * (FINALIZE) | (QUASI_CLOSE) | *
- * | | *
- * | | *
- * (CLOSE) | (FORCE_CLOSE) | *
- * | | *
- * | | *
- * +--------->[CLOSED]<--------+ *
- * | *
- * (DELETE)| *
- * | *
- * | *
- * [DELETING] *
- * | *
- * (CLEANUP) | *
- * | *
- * V *
- * [DELETED] *
- * *
- ************************************************************************/
-
/**
- *
+ * Logger instance of ContainerStateManagerImpl.
*/
private static final Logger LOG = LoggerFactory.getLogger(
ContainerStateManagerImpl.class);
/**
- *
+ * Configured container size.
*/
private final long containerSize;
/**
- *
+ * The container ID sequence which is used to create new container.
+ * This will be removed once we have a Distributed Sequence ID Generator.
*/
+ @Deprecated
private final AtomicLong nextContainerID;
/**
- *
+ * In-memory representation of Container States.
*/
private final ContainerStateMap containers;
/**
- *
+ * Persistent store for Container States.
*/
- private final PipelineManager pipelineManager;
+ private Table<ContainerID, ContainerInfo> containerStore;
/**
- *
+ * PipelineManager instance.
*/
- private Table<ContainerID, ContainerInfo> containerStore;
+ private final PipelineManager pipelineManager;
/**
- *
+ * Container lifecycle state machine.
*/
private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
/**
- *
+ * We use the containers in round-robin fashion for operations like block
+ * allocation. This map is used for remembering the last used container.
*/
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
/**
+ * constructs ContainerStateManagerImpl instance and loads the containers
+ * form the persistent storage.
*
+ * @param conf the Configuration
+ * @param pipelineManager the {@link PipelineManager} instance
+ * @param containerStore the persistent storage
+ * @throws IOException in case of error while loading the containers
*/
private ContainerStateManagerImpl(final Configuration conf,
final PipelineManager pipelineManager,
@@ -158,7 +121,7 @@ public final class ContainerStateManagerImpl
this.containerStore = containerStore;
this.stateMachine = newStateMachine();
this.containerSize = getConfiguredContainerSize(conf);
- this.nextContainerID = new AtomicLong();
+ this.nextContainerID = new AtomicLong(1L);
this.containers = new ContainerStateMap();
this.lastUsedMap = new ConcurrentHashMap<>();
@@ -166,7 +129,9 @@ public final class ContainerStateManagerImpl
}
/**
+ * Creates and initializes a new Container Lifecycle StateMachine.
*
+ * @return the Container Lifecycle StateMachine
*/
private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() {
@@ -208,7 +173,9 @@ public final class ContainerStateManagerImpl
}
/**
+ * Returns the configured container size.
*
+ * @return the max size of container
*/
private long getConfiguredContainerSize(final Configuration conf) {
return (long) conf.getStorageSize(
@@ -218,7 +185,9 @@ public final class ContainerStateManagerImpl
}
/**
+ * Loads the containers from container store into memory.
*
+ * @throws IOException in case of error while loading the containers
*/
private void initialize() throws IOException {
TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
@@ -282,16 +251,20 @@ public final class ContainerStateManagerImpl
Preconditions.checkNotNull(containerInfo);
final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo);
- if (getContainer(container.containerID()) == null) {
- Preconditions.checkArgument(nextContainerID.get()
- == container.containerID().getId(),
- "ContainerID mismatch.");
-
- pipelineManager.addContainerToPipeline(
- container.getPipelineID(), container.containerID());
- containers.addContainer(container);
- nextContainerID.incrementAndGet();
- }
+ final ContainerID containerID = container.containerID();
+ final PipelineID pipelineID = container.getPipelineID();
+
+ /*
+ * TODO:
+ * Check if the container already exist in in ContainerStateManager.
+ * This optimization can be done after moving ContainerNotFoundException
+ * from ContainerStateMap to ContainerManagerImpl.
+ */
+
+ containerStore.put(containerID, container);
+ containers.addContainer(container);
+ pipelineManager.addContainerToPipeline(pipelineID, containerID);
+ nextContainerID.incrementAndGet();
}
void updateContainerState(final ContainerID containerID,
@@ -337,7 +310,9 @@ public final class ContainerStateManagerImpl
throw new UnsupportedOperationException("Not yet implemented!");
}
- void close() throws IOException {
+ @Override
+ public void close() throws Exception {
+ containerStore.close();
}
public static Builder newBuilder() {
@@ -382,7 +357,6 @@ public final class ContainerStateManagerImpl
final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
conf, pipelineMgr, table);
- scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm);
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
index 9960354..3520b01 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
@@ -17,20 +17,26 @@
package org.apache.hadoop.hdds.scm.container;
+import java.io.IOException;
+import java.util.Set;
+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
-import java.io.IOException;
-import java.util.Set;
-
/**
+ * A ContainerStateManager is responsible for keeping track of all the
+ * container and its state inside SCM, it also exposes methods to read and
+ * modify the container and its state.
*
- * TODO: Add proper javadoc.
+ * All the mutation operations are marked with {@link Replicate} annotation so
+ * that when SCM-HA is enabled, the mutations are replicated from leader SCM
+ * to the followers.
*
- * Implementation of methods marked with {@code @Replicate} annotation should
be
+ * When a method is marked with {@link Replicate} annotation it should follow
+ * the below rules.
*
- * 1. Idempotent
+ * 1. The method call should be Idempotent
* 2. Arguments should be of protobuf objects
* 3. Return type should be of protobuf object
* 4. The declaration should throw RaftException
@@ -38,13 +44,65 @@ import java.util.Set;
*/
public interface ContainerStateManagerV2 {
+ //TODO: Rename this to ContainerStateManager
+
+ /* **********************************************************************
+ * Container Life Cycle *
+ * *
+ * Event and State Transition Mapping: *
+ * *
+ * State: OPEN ----------------> CLOSING *
+ * Event: FINALIZE *
+ * *
+ * State: CLOSING ----------------> QUASI_CLOSED *
+ * Event: QUASI_CLOSE *
+ * *
+ * State: CLOSING ----------------> CLOSED *
+ * Event: CLOSE *
+ * *
+ * State: QUASI_CLOSED ----------------> CLOSED *
+ * Event: FORCE_CLOSE *
+ * *
+ * State: CLOSED ----------------> DELETING *
+ * Event: DELETE *
+ * *
+ * State: DELETING ----------------> DELETED *
+ * Event: CLEANUP *
+ * *
+ * *
+ * Container State Flow: *
+ * *
+ * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] *
+ * (FINALIZE) | (QUASI_CLOSE) | *
+ * | | *
+ * | | *
+ * (CLOSE) | (FORCE_CLOSE) | *
+ * | | *
+ * | | *
+ * +--------->[CLOSED]<--------+ *
+ * | *
+ * (DELETE)| *
+ * | *
+ * | *
+ * [DELETING] *
+ * | *
+ * (CLEANUP) | *
+ * | *
+ * V *
+ * [DELETED] *
+ * *
+ ************************************************************************/
+
/**
- *
+ * Returns a new container ID which can be used for allocating a new
+ * container.
*/
ContainerID getNextContainerID();
/**
+ * Returns the ID of all the managed containers.
*
+ * @return Set of {@link ContainerID}
*/
Set<ContainerID> getContainerIDs();
@@ -72,4 +130,8 @@ public interface ContainerStateManagerV2 {
void addContainer(ContainerInfoProto containerInfo)
throws IOException;
+ /**
+ *
+ */
+ void close() throws Exception;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 8cef966..d71049b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -535,6 +535,7 @@ public class ContainerStateMap {
}
}
+ // TODO: Move container not found exception to upper layer.
private void checkIfContainerExist(ContainerID containerID)
throws ContainerNotFoundException {
if (!containerMap.containsKey(containerID)) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
index c78c616..cbe2ce3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
@@ -50,6 +50,7 @@ public class SCMHAInvocationHandler implements
InvocationHandler {
this.requestType = requestType;
this.localHandler = localHandler;
this.ratisHandler = ratisHandler;
+ ratisHandler.registerStateMachineHandler(requestType, localHandler);
}
@Override
@@ -71,8 +72,9 @@ public class SCMHAInvocationHandler implements
InvocationHandler {
*/
private Object invokeLocal(Method method, Object[] args)
throws InvocationTargetException, IllegalAccessException {
- LOG.trace("Invoking method {} on target {}", method, localHandler);
- return method.invoke(method, args);
+ LOG.trace("Invoking method {} on target {} with arguments {}",
+ method, localHandler, args);
+ return method.invoke(localHandler, args);
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index b38fc43..eb6c800 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -17,60 +17,30 @@
package org.apache.hadoop.hdds.scm.ha;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-
import java.io.IOException;
/**
* SCMHAManager provides HA service for SCM.
- *
- * It uses Apache Ratis for HA implementation. We will have a 2N+1
- * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
- * nodes.
- *
- * TODO
- *
*/
-public class SCMHAManager {
-
- private static boolean isLeader = true;
-
- private final SCMRatisServer ratisServer;
-
- /**
- * Creates SCMHAManager instance.
- */
- public SCMHAManager(final ConfigurationSource conf) throws IOException {
- this.ratisServer = new SCMRatisServer(
- conf.getObject(SCMHAConfiguration.class), conf);
- }
+public interface SCMHAManager {
/**
* Starts HA service.
*/
- public void start() throws IOException {
- ratisServer.start();
- }
+ void start() throws IOException;
/**
* Returns true if the current SCM is the leader.
*/
- public static boolean isLeader() {
- return isLeader;
- }
+ boolean isLeader();
/**
* Returns RatisServer instance associated with the SCM instance.
*/
- public SCMRatisServer getRatisServer() {
- return ratisServer;
- }
+ SCMRatisServer getRatisServer();
/**
* Stops the HA service.
*/
- public void shutdown() throws IOException {
- ratisServer.stop();
- }
-
+ void shutdown() throws IOException;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
similarity index 74%
copy from
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
copy to
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index b38fc43..89ac714 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -22,53 +22,55 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import java.io.IOException;
/**
- * SCMHAManager provides HA service for SCM.
- *
- * It uses Apache Ratis for HA implementation. We will have a 2N+1
+ * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
* node Ratis ring. The Ratis ring will have one Leader node and 2N follower
* nodes.
*
* TODO
*
*/
-public class SCMHAManager {
+public class SCMHAManagerImpl implements SCMHAManager {
private static boolean isLeader = true;
- private final SCMRatisServer ratisServer;
+ private final SCMRatisServerImpl ratisServer;
/**
* Creates SCMHAManager instance.
*/
- public SCMHAManager(final ConfigurationSource conf) throws IOException {
- this.ratisServer = new SCMRatisServer(
+ public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException {
+ this.ratisServer = new SCMRatisServerImpl(
conf.getObject(SCMHAConfiguration.class), conf);
}
/**
- * Starts HA service.
+ * {@inheritDoc}
*/
+ @Override
public void start() throws IOException {
ratisServer.start();
}
/**
- * Returns true if the current SCM is the leader.
+ * {@inheritDoc}
*/
- public static boolean isLeader() {
+ @Override
+ public boolean isLeader() {
return isLeader;
}
/**
- * Returns RatisServer instance associated with the SCM instance.
+ * {@inheritDoc}
*/
+ @Override
public SCMRatisServer getRatisServer() {
return ratisServer;
}
/**
- * Stops the HA service.
+ * {@inheritDoc}
*/
+ @Override
public void shutdown() throws IOException {
ratisServer.stop();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
index c4bedcc..21ca4be 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
@@ -67,6 +67,10 @@ public final class SCMRatisResponse {
public static Message encode(final Object result)
throws InvalidProtocolBufferException {
+ if (result == null) {
+ return Message.EMPTY;
+ }
+
final ByteString value;
if (result instanceof GeneratedMessage) {
value = ((GeneratedMessage) result).toByteString();
@@ -98,6 +102,10 @@ public final class SCMRatisResponse {
private static Object deserializeResult(byte[] response)
throws InvalidProtocolBufferException {
+ if (response.length == 0) {
+ return null;
+ }
+
final SCMRatisResponseProto responseProto =
SCMRatisResponseProto.parseFrom(response);
try {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index 209535d..4ddbc7b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -17,93 +17,22 @@
package org.apache.hadoop.hdds.scm.ha;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServer;
/**
* TODO.
*/
-public class SCMRatisServer {
-
- private final InetSocketAddress address;
- private final RaftServer server;
- private final RaftGroupId raftGroupId;
- private final RaftGroup raftGroup;
- private final RaftPeerId raftPeerId;
- private final SCMStateMachine scmStateMachine;
- private final ClientId clientId = ClientId.randomId();
- private final AtomicLong callId = new AtomicLong();
+public interface SCMRatisServer {
+ void start() throws IOException;
- // TODO: Refactor and remove ConfigurationSource and use only
- // SCMHAConfiguration.
- SCMRatisServer(final SCMHAConfiguration haConf,
- final ConfigurationSource conf)
- throws IOException {
- final String scmServiceId = "SCM-HA-Service";
- final String scmNodeId = "localhost";
- this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
- this.address = haConf.getRatisBindAddress();
- final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address);
- final List<RaftPeer> raftPeers = new ArrayList<>();
- raftPeers.add(localRaftPeer);
- final RaftProperties serverProperties = RatisUtil
- .newRaftProperties(haConf, conf);
- this.raftGroupId = RaftGroupId.valueOf(
- UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8)));
- this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
- this.scmStateMachine = new SCMStateMachine();
- this.server = RaftServer.newBuilder()
- .setServerId(raftPeerId)
- .setGroup(raftGroup)
- .setProperties(serverProperties)
- .setStateMachine(scmStateMachine)
- .build();
- }
-
- void start() throws IOException {
- server.start();
- }
-
- public void registerStateMachineHandler(final RequestType handlerType,
- final Object handler) {
- scmStateMachine.registerHandler(handlerType, handler);
- }
+ void registerStateMachineHandler(RequestType handlerType, Object handler);
SCMRatisResponse submitRequest(SCMRatisRequest request)
- throws IOException, ExecutionException, InterruptedException {
- final RaftClientRequest raftClientRequest = new RaftClientRequest(
- clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
- RaftClientRequest.writeRequestType(), null);
- final RaftClientReply raftClientReply =
- server.submitClientRequestAsync(raftClientRequest).get();
- return SCMRatisResponse.decode(raftClientReply);
- }
-
- private long nextCallId() {
- return callId.getAndIncrement() & Long.MAX_VALUE;
- }
-
- void stop() throws IOException {
- server.close();
- }
+ throws IOException, ExecutionException, InterruptedException;
+ void stop() throws IOException;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
similarity index 91%
copy from
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
copy to
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 209535d..45ae212 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -41,7 +41,7 @@ import org.apache.ratis.server.RaftServer;
/**
* TODO.
*/
-public class SCMRatisServer {
+public class SCMRatisServerImpl implements SCMRatisServer {
private final InetSocketAddress address;
private final RaftServer server;
@@ -55,8 +55,8 @@ public class SCMRatisServer {
// TODO: Refactor and remove ConfigurationSource and use only
// SCMHAConfiguration.
- SCMRatisServer(final SCMHAConfiguration haConf,
- final ConfigurationSource conf)
+ SCMRatisServerImpl(final SCMHAConfiguration haConf,
+ final ConfigurationSource conf)
throws IOException {
final String scmServiceId = "SCM-HA-Service";
final String scmNodeId = "localhost";
@@ -79,16 +79,19 @@ public class SCMRatisServer {
.build();
}
- void start() throws IOException {
+ @Override
+ public void start() throws IOException {
server.start();
}
+ @Override
public void registerStateMachineHandler(final RequestType handlerType,
final Object handler) {
scmStateMachine.registerHandler(handlerType, handler);
}
- SCMRatisResponse submitRequest(SCMRatisRequest request)
+ @Override
+ public SCMRatisResponse submitRequest(SCMRatisRequest request)
throws IOException, ExecutionException, InterruptedException {
final RaftClientRequest raftClientRequest = new RaftClientRequest(
clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
@@ -102,7 +105,8 @@ public class SCMRatisServer {
return callId.getAndIncrement() & Long.MAX_VALUE;
}
- void stop() throws IOException {
+ @Override
+ public void stop() throws IOException {
server.close();
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
new file mode 100644
index 0000000..022d392
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+
+/**
+ * Tests to verify the functionality of ContainerManager.
+ */
+public class TestContainerManagerImpl {
+
+ private File testDir;
+ private DBStore dbStore;
+ private ContainerManagerV2 containerManager;
+
+ @Before
+ public void setUp() throws Exception {
+ final OzoneConfiguration conf = SCMTestUtils.getConf();
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ final PipelineManager pipelineManager = MockPipelineManager.getInstance();
+ pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ containerManager = new ContainerManagerImpl(conf,
+ MockSCMHAManager.getInstance(), pipelineManager,
+ SCMDBDefinition.CONTAINERS.getTable(dbStore));
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ if(containerManager != null) {
+ containerManager.close();
+ }
+
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
+ }
+
+ @Test
+ public void testAllocateContainer() throws Exception {
+ Assert.assertTrue(containerManager.getContainerIDs().isEmpty());
+ final ContainerInfo container = containerManager.allocateContainer(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE, "admin");
+ Assert.assertEquals(1, containerManager.getContainerIDs().size());
+ Assert.assertNotNull(containerManager.getContainer(
+ container.containerID()));
+ }
+
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
new file mode 100644
index 0000000..c3b14fb
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.StateMachineException;
+
+/**
+ * Mock SCMHAManager implementation for testing.
+ */
+public final class MockSCMHAManager implements SCMHAManager {
+
+ private final SCMRatisServer ratisServer;
+
+ public static SCMHAManager getInstance() {
+ return new MockSCMHAManager();
+ }
+
+ /**
+ * Creates MockSCMHAManager instance.
+ */
+ private MockSCMHAManager() {
+ this.ratisServer = new MockRatisServer();
+ }
+
+ @Override
+ public void start() throws IOException {
+ ratisServer.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isLeader() {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SCMRatisServer getRatisServer() {
+ return ratisServer;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void shutdown() throws IOException {
+ ratisServer.stop();
+ }
+
+ private static class MockRatisServer implements SCMRatisServer {
+
+ private Map<RequestType, Object> handlers =
+ new EnumMap<>(RequestType.class);
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void registerStateMachineHandler(final RequestType handlerType,
+ final Object handler) {
+ handlers.put(handlerType, handler);
+ }
+
+ @Override
+ public SCMRatisResponse submitRequest(final SCMRatisRequest request)
+ throws IOException {
+ final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf(
+ RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
+ RaftClientReply reply;
+ try {
+ final Message result = process(request);
+ return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
+ raftId, 1L, true, result, null, 1L, null));
+ } catch (Exception ex) {
+ return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
+ raftId, 1L, false, null,
+ new StateMachineException(raftId, ex), 1L, null));
+ }
+ }
+
+ private Message process(final SCMRatisRequest request)
+ throws Exception {
+ try {
+ final Object handler = handlers.get(request.getType());
+
+ if (handler == null) {
+ throw new IOException("No handler found for request type " +
+ request.getType());
+ }
+
+ final List<Class<?>> argumentTypes = new ArrayList<>();
+ for(Object args : request.getArguments()) {
+ argumentTypes.add(args.getClass());
+ }
+ final Object result = handler.getClass().getMethod(
+ request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
+ .invoke(handler, request.getArguments());
+
+ return SCMRatisResponse.encode(result);
+ } catch (NoSuchMethodException | SecurityException ex) {
+ throw new InvalidProtocolBufferException(ex.getMessage());
+ } catch (InvocationTargetException e) {
+ final Exception targetEx = (Exception) e.getTargetException();
+ throw targetEx != null ? targetEx : e;
+ }
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
new file mode 100644
index 0000000..5dd6082
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Mock PipelineManager implementation for testing.
+ */
+public final class MockPipelineManager implements PipelineManager {
+
+ private PipelineStateManager stateManager;
+
+ public static PipelineManager getInstance() {
+ return new MockPipelineManager();
+ }
+
+ private MockPipelineManager() {
+ this.stateManager = new PipelineStateManager();
+ }
+
+ @Override
+ public Pipeline createPipeline(final ReplicationType type,
+ final ReplicationFactor factor)
+ throws IOException {
+ final List<DatanodeDetails> nodes = Stream.generate(
+ MockDatanodeDetails::randomDatanodeDetails)
+ .limit(factor.getNumber()).collect(Collectors.toList());
+ final Pipeline pipeline = Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
+ .setType(type)
+ .setFactor(factor)
+ .setNodes(nodes)
+ .setState(Pipeline.PipelineState.OPEN)
+ .build();
+ stateManager.addPipeline(pipeline);
+ return pipeline;
+ }
+
+ @Override
+ public Pipeline createPipeline(final ReplicationType type,
+ final ReplicationFactor factor,
+ final List<DatanodeDetails> nodes) {
+ return Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
+ .setType(type)
+ .setFactor(factor)
+ .setNodes(nodes)
+ .setState(Pipeline.PipelineState.OPEN)
+ .build();
+ }
+
+ @Override
+ public Pipeline getPipeline(final PipelineID pipelineID)
+ throws PipelineNotFoundException {
+ return stateManager.getPipeline(pipelineID);
+ }
+
+ @Override
+ public boolean containsPipeline(final PipelineID pipelineID) {
+ try {
+ return stateManager.getPipeline(pipelineID) != null;
+ } catch (PipelineNotFoundException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public List<Pipeline> getPipelines() {
+ return stateManager.getPipelines();
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(final ReplicationType type) {
+ return stateManager.getPipelines(type);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(final ReplicationType type,
+ final ReplicationFactor factor) {
+ return stateManager.getPipelines(type, factor);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(final ReplicationType type,
+ final Pipeline.PipelineState state) {
+ return stateManager.getPipelines(type, state);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(final ReplicationType type,
+ final ReplicationFactor factor,
+ final Pipeline.PipelineState state) {
+ return stateManager.getPipelines(type, factor, state);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(final ReplicationType type,
+ final ReplicationFactor factor, final Pipeline.PipelineState state,
+ final Collection<DatanodeDetails> excludeDns,
+ final Collection<PipelineID> excludePipelines) {
+ return stateManager.getPipelines(type, factor, state,
+ excludeDns, excludePipelines);
+ }
+
+ @Override
+ public void addContainerToPipeline(final PipelineID pipelineID,
+ final ContainerID containerID)
+ throws IOException {
+ stateManager.addContainerToPipeline(pipelineID, containerID);
+ }
+
+ @Override
+ public void removeContainerFromPipeline(final PipelineID pipelineID,
+ final ContainerID containerID)
+ throws IOException {
+ stateManager.removeContainerFromPipeline(pipelineID, containerID);
+ }
+
+ @Override
+ public NavigableSet<ContainerID> getContainersInPipeline(
+ final PipelineID pipelineID) throws IOException {
+ return getContainersInPipeline(pipelineID);
+ }
+
+ @Override
+ public int getNumberOfContainers(final PipelineID pipelineID)
+ throws IOException {
+ return getContainersInPipeline(pipelineID).size();
+ }
+
+ @Override
+ public void openPipeline(final PipelineID pipelineId)
+ throws IOException {
+ stateManager.openPipeline(pipelineId);
+ }
+
+ @Override
+ public void finalizeAndDestroyPipeline(final Pipeline pipeline,
+ final boolean onTimeout)
+ throws IOException {
+ stateManager.finalizePipeline(pipeline.getId());
+ }
+
+ @Override
+ public void scrubPipeline(final ReplicationType type,
+ final ReplicationFactor factor)
+ throws IOException {
+
+ }
+
+ @Override
+ public void startPipelineCreator() {
+
+ }
+
+ @Override
+ public void triggerPipelineCreation() {
+
+ }
+
+ @Override
+ public void incNumBlocksAllocatedMetric(final PipelineID id) {
+
+ }
+
+ @Override
+ public void activatePipeline(final PipelineID pipelineID)
+ throws IOException {
+
+ }
+
+ @Override
+ public void deactivatePipeline(final PipelineID pipelineID)
+ throws IOException {
+ stateManager.deactivatePipeline(pipelineID);
+ }
+
+ @Override
+ public boolean getSafeModeStatus() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Map<String, Integer> getPipelineInfo() {
+ return null;
+ }
+
+ @Override
+ public void onMessage(final SCMSafeModeManager.SafeModeStatus safeModeStatus,
+ final EventPublisher publisher) {
+
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]