This is an automated email from the ASF dual-hosted git repository.
licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 1ea9555 HDDS-3186. Introduce generic SCMRatisRequest and
SCMRatisResponse. (#959)
1ea9555 is described below
commit 1ea9555d5e4426902995f7b9c5cd9fde167e3c1c
Author: Nandakumar <[email protected]>
AuthorDate: Tue May 26 13:26:16 2020 +0530
HDDS-3186. Introduce generic SCMRatisRequest and SCMRatisResponse. (#959)
* HDDS-3186. Initial version.
* HDDS-3186. Additional changes.
---
.../server-scm/dev-support/findbugsExcludeFile.xml | 21 ++
hadoop-hdds/server-scm/pom.xml | 38 ++
.../hdds/scm/container/ContainerManagerImpl.java | 282 +++++++++++++++
.../hdds/scm/container/ContainerManagerV2.java | 189 ++++++++++
.../scm/container/ContainerStateManagerImpl.java | 397 +++++++++++++++++++++
.../scm/container/ContainerStateManagerV2.java | 75 ++++
.../org/apache/hadoop/hdds/scm/ha/RatisUtil.java | 155 ++++++++
.../apache/hadoop/hdds/scm/ha/ReflectionUtil.java | 67 ++++
.../hadoop/hdds/scm/ha/SCMHAConfiguration.java | 225 ++++++++++++
.../hadoop/hdds/scm/ha/SCMHAInvocationHandler.java | 93 +++++
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 76 ++++
.../apache/hadoop/hdds/scm/ha/SCMRatisRequest.java | 147 ++++++++
.../hadoop/hdds/scm/ha/SCMRatisResponse.java | 127 +++++++
.../apache/hadoop/hdds/scm/ha/SCMRatisServer.java | 109 ++++++
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 92 +++++
.../apache/hadoop/hdds/scm/metadata/Replicate.java | 33 ++
.../src/main/proto/SCMRatisProtocol.proto | 46 +++
17 files changed, 2172 insertions(+)
diff --git a/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml
b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml
new file mode 100644
index 0000000..3571a89
--- /dev/null
+++ b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml
@@ -0,0 +1,21 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<FindBugsFilter>
+ <Match>
+ <Package name="org.apache.hadoop.hdds.protocol.proto"/>
+ </Match>
+</FindBugsFilter>
diff --git a/hadoop-hdds/server-scm/pom.xml b/hadoop-hdds/server-scm/pom.xml
index dcbc42a..8c17aae 100644
--- a/hadoop-hdds/server-scm/pom.xml
+++ b/hadoop-hdds/server-scm/pom.xml
@@ -128,6 +128,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>hadoop-hdds-hadoop-dependency-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -163,6 +168,39 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <imports>
+ <param>
+ ${basedir}/src/main/proto
+ </param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>SCMRatisProtocol.proto</include>
+ </includes>
+ </source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-maven-plugin</artifactId>
+ <configuration>
+
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
+ </configuration>
+ </plugin>
</plugins>
<testResources>
<testResource>
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
new file mode 100644
index 0000000..0404530
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: Add javadoc.
+ */
+public class ContainerManagerImpl implements ContainerManagerV2 {
+
+ /**
+ *
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ContainerManagerImpl.class);
+
+ /**
+ *
+ */
+ private final ReadWriteLock lock;
+
+ /**
+ *
+ */
+ private final PipelineManager pipelineManager;
+
+ /**
+ *
+ */
+ private final ContainerStateManagerV2 containerStateManager;
+
+ /**
+ *
+ */
+ public ContainerManagerImpl(
+ // Introduce builder for this class?
+ final Configuration conf, final PipelineManager pipelineManager,
+ final SCMHAManager scmhaManager,
+ final Table<ContainerID, ContainerInfo> containerStore)
+ throws IOException {
+ this.lock = new ReentrantReadWriteLock();
+ this.pipelineManager = pipelineManager;
+ this.containerStateManager = ContainerStateManagerImpl.newBuilder()
+ .setConfiguration(conf)
+ .setPipelineManager(pipelineManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setContainerStore(containerStore)
+ .build();
+ }
+
+ @Override
+ public Set<ContainerID> getContainerIDs() {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainerIDs();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Set<ContainerInfo> getContainers() {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainerIDs().stream().map(id -> {
+ try {
+ return containerStateManager.getContainer(id);
+ } catch (ContainerNotFoundException e) {
+ // How can this happen? o_O
+ return null;
+ }
+ }).filter(Objects::nonNull).collect(Collectors.toSet());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public ContainerInfo getContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainer(containerID);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Set<ContainerInfo> getContainers(final LifeCycleState state) {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainerIDs(state).stream().map(id -> {
+ try {
+ return containerStateManager.getContainer(id);
+ } catch (ContainerNotFoundException e) {
+ // How can this happen? o_O
+ return null;
+ }
+ }).filter(Objects::nonNull).collect(Collectors.toSet());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean exists(final ContainerID containerID) {
+ lock.readLock().lock();
+ try {
+ return (containerStateManager.getContainer(containerID) != null);
+ } catch (ContainerNotFoundException ex) {
+ return false;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<ContainerInfo> listContainers(final ContainerID startID,
+ final int count) {
+ lock.readLock().lock();
+ try {
+ final long startId = startID == null ? 0 : startID.getId();
+ final List<ContainerID> containersIds =
+ new ArrayList<>(containerStateManager.getContainerIDs());
+ Collections.sort(containersIds);
+ return containersIds.stream()
+ .filter(id -> id.getId() > startId)
+ .limit(count)
+ .map(id -> {
+ try {
+ return containerStateManager.getContainer(id);
+ } catch (ContainerNotFoundException ex) {
+ // This can never happen, as we hold lock no one else can remove
+ // the container after we got the container ids.
+ LOG.warn("Container Missing.", ex);
+ return null;
+ }
+ }).collect(Collectors.toList());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public ContainerInfo allocateContainer(final ReplicationType type,
+ final ReplicationFactor replicationFactor, final String owner)
+ throws IOException {
+ lock.writeLock().lock();
+ try {
+ final List<Pipeline> pipelines = pipelineManager
+ .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
+
+ if (pipelines.isEmpty()) {
+ throw new IOException("Could not allocate container. Cannot get any" +
+ " matching pipeline for Type:" + type + ", Factor:" +
+ replicationFactor + ", State:PipelineState.OPEN");
+ }
+
+ final ContainerID containerID = containerStateManager
+ .getNextContainerID();
+ final Pipeline pipeline = pipelines.get(
+ (int) containerID.getId() % pipelines.size());
+
+ final ContainerInfoProto containerInfo = ContainerInfoProto.newBuilder()
+ .setState(LifeCycleState.OPEN)
+ .setPipelineID(pipeline.getId().getProtobuf())
+ .setUsedBytes(0)
+ .setNumberOfKeys(0)
+ .setStateEnterTime(Time.now())
+ .setOwner(owner)
+ .setContainerID(containerID.getId())
+ .setDeleteTransactionId(0)
+ .setReplicationFactor(pipeline.getFactor())
+ .setReplicationType(pipeline.getType())
+ .build();
+ containerStateManager.addContainer(containerInfo);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("New container allocated: {}", containerInfo);
+ }
+ return containerStateManager.getContainer(containerID);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void deleteContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void updateContainerState(final ContainerID containerID,
+ final LifeCycleEvent event)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public Set<ContainerReplica> getContainerReplicas(
+ final ContainerID containerID) throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void updateContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void removeContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void updateDeleteTransactionId(
+ final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public ContainerInfo getMatchingContainer(final long size, final String
owner,
+ final Pipeline pipeline, final List<ContainerID> excludedContainerIDS) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void notifyContainerReportProcessing(final boolean isFullReport,
+ final boolean success) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
new file mode 100644
index 0000000..37c7b70
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+
+/**
+ * TODO: Add extensive javadoc.
+ *
+ * ContainerManager class contains the mapping from a name to a pipeline
+ * mapping. This is used by SCM when allocating new locations and when
+ * looking up a key.
+ */
+public interface ContainerManagerV2 extends Closeable {
+
+
+ /**
+ * Returns all the container Ids managed by ContainerManager.
+ *
+ * @return Set of ContainerID
+ */
+ Set<ContainerID> getContainerIDs();
+
+ /**
+ * Returns all the containers managed by ContainerManager.
+ *
+ * @return List of ContainerInfo
+ */
+ Set<ContainerInfo> getContainers();
+
+ /**
+ * Returns all the containers which are in the specified state.
+ *
+ * @return List of ContainerInfo
+ */
+ Set<ContainerInfo> getContainers(LifeCycleState state);
+
+ /**
+ * Returns the ContainerInfo from the container ID.
+ *
+ */
+ ContainerInfo getContainer(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ boolean exists(ContainerID containerID);
+
+ /**
+ * Returns containers under certain conditions.
+ * Search container IDs from start ID(exclusive),
+ * The max size of the searching range cannot exceed the
+ * value of count.
+ *
+ * @param startID start containerID, >=0,
+ * start searching at the head if 0.
+ * @param count count must be >= 0
+ * Usually the count will be replace with a very big
+ * value instead of being unlimited in case the db is very big.
+ *
+ * @return a list of container.
+ */
+ List<ContainerInfo> listContainers(ContainerID startID, int count);
+
+ /**
+ * Allocates a new container for a given keyName and replication factor.
+ *
+ * @param replicationFactor - replication factor of the container.
+ * @param owner
+ * @return - ContainerInfo.
+ * @throws IOException
+ */
+ ContainerInfo allocateContainer(ReplicationType type,
+ ReplicationFactor replicationFactor,
+ String owner) throws IOException;
+
+ /**
+ * Deletes a container from SCM.
+ *
+ * @param containerID - Container ID
+ * @throws IOException
+ */
+ void deleteContainer(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ * Update container state.
+ * @param containerID - Container ID
+ * @param event - container life cycle event
+ * @throws IOException
+ */
+ void updateContainerState(ContainerID containerID,
+ LifeCycleEvent event)
+ throws ContainerNotFoundException;
+
+ /**
+ * Returns the latest list of replicas for given containerId.
+ *
+ * @param containerID Container ID
+ * @return Set of ContainerReplica
+ */
+ Set<ContainerReplica> getContainerReplicas(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ * Adds a container Replica for the given Container.
+ *
+ * @param containerID Container ID
+ * @param replica ContainerReplica
+ */
+ void updateContainerReplica(ContainerID containerID, ContainerReplica
replica)
+ throws ContainerNotFoundException;
+
+ /**
+ * Remove a container Replica form a given Container.
+ *
+ * @param containerID Container ID
+ * @param replica ContainerReplica
+ * @return True of dataNode is removed successfully else false.
+ */
+ void removeContainerReplica(ContainerID containerID, ContainerReplica
replica)
+ throws ContainerNotFoundException, ContainerReplicaNotFoundException;
+
+ /**
+ * Update deleteTransactionId according to deleteTransactionMap.
+ *
+ * @param deleteTransactionMap Maps the containerId to latest delete
+ * transaction id for the container.
+ * @throws IOException
+ */
+ void updateDeleteTransactionId(Map<ContainerID, Long> deleteTransactionMap)
+ throws IOException;
+
+ /**
+ * Returns ContainerInfo which matches the requirements.
+ * @param size - the amount of space required in the container
+ * @param owner - the user which requires space in its owned container
+ * @param pipeline - pipeline to which the container should belong
+ * @return ContainerInfo for the matching container.
+ */
+ default ContainerInfo getMatchingContainer(long size, String owner,
+ Pipeline pipeline) {
+ return getMatchingContainer(size, owner, pipeline,
Collections.emptyList());
+ }
+
+ /**
+ * Returns ContainerInfo which matches the requirements.
+ * @param size - the amount of space required in the container
+ * @param owner - the user which requires space in its owned container
+ * @param pipeline - pipeline to which the container should belong.
+ * @param excludedContainerIDS - containerIds to be excluded.
+ * @return ContainerInfo for the matching container.
+ */
+ ContainerInfo getMatchingContainer(long size, String owner,
+ Pipeline pipeline,
+ List<ContainerID> excludedContainerIDS);
+
+ /**
+ * Once after report processor handler completes, call this to notify
+ * container manager to increment metrics.
+ * @param isFullReport
+ * @param success
+ */
+ // Is it possible to remove this from the Interface?
+ void notifyContainerReportProcessing(boolean isFullReport, boolean success);
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
new file mode 100644
index 0000000..16fe340
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+
+import java.lang.reflect.Proxy;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.states.ContainerState;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+
+/**
+ * TODO: Add javadoc.
+ */
+public final class ContainerStateManagerImpl
+ implements ContainerStateManagerV2 {
+
+ /* **********************************************************************
+ * Container Life Cycle *
+ * *
+ * Event and State Transition Mapping: *
+ * *
+ * State: OPEN ----------------> CLOSING *
+ * Event: FINALIZE *
+ * *
+ * State: CLOSING ----------------> QUASI_CLOSED *
+ * Event: QUASI_CLOSE *
+ * *
+ * State: CLOSING ----------------> CLOSED *
+ * Event: CLOSE *
+ * *
+ * State: QUASI_CLOSED ----------------> CLOSED *
+ * Event: FORCE_CLOSE *
+ * *
+ * State: CLOSED ----------------> DELETING *
+ * Event: DELETE *
+ * *
+ * State: DELETING ----------------> DELETED *
+ * Event: CLEANUP *
+ * *
+ * *
+ * Container State Flow: *
+ * *
+ * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] *
+ * (FINALIZE) | (QUASI_CLOSE) | *
+ * | | *
+ * | | *
+ * (CLOSE) | (FORCE_CLOSE) | *
+ * | | *
+ * | | *
+ * +--------->[CLOSED]<--------+ *
+ * | *
+ * (DELETE)| *
+ * | *
+ * | *
+ * [DELETING] *
+ * | *
+ * (CLEANUP) | *
+ * | *
+ * V *
+ * [DELETED] *
+ * *
+ ************************************************************************/
+
+ /**
+ *
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ContainerStateManagerImpl.class);
+
+ /**
+ *
+ */
+ private final long containerSize;
+
+ /**
+ *
+ */
+ private final AtomicLong nextContainerID;
+
+ /**
+ *
+ */
+ private final ContainerStateMap containers;
+
+ /**
+ *
+ */
+ private final PipelineManager pipelineManager;
+
+ /**
+ *
+ */
+ private Table<ContainerID, ContainerInfo> containerStore;
+
+ /**
+ *
+ */
+ private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
+
+ /**
+ *
+ */
+ private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
+
+ /**
+ *
+ */
+ private ContainerStateManagerImpl(final Configuration conf,
+ final PipelineManager pipelineManager,
+ final Table<ContainerID, ContainerInfo> containerStore)
+ throws IOException {
+ this.pipelineManager = pipelineManager;
+ this.containerStore = containerStore;
+ this.stateMachine = newStateMachine();
+ this.containerSize = getConfiguredContainerSize(conf);
+ this.nextContainerID = new AtomicLong();
+ this.containers = new ContainerStateMap();
+ this.lastUsedMap = new ConcurrentHashMap<>();
+
+ initialize();
+ }
+
+ /**
+ *
+ */
+ private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() {
+
+ final Set<LifeCycleState> finalStates = new HashSet<>();
+
+ // These are the steady states of a container.
+ finalStates.add(LifeCycleState.OPEN);
+ finalStates.add(LifeCycleState.CLOSED);
+ finalStates.add(LifeCycleState.DELETED);
+
+ final StateMachine<LifeCycleState, LifeCycleEvent> containerLifecycleSM =
+ new StateMachine<>(LifeCycleState.OPEN, finalStates);
+
+ containerLifecycleSM.addTransition(LifeCycleState.OPEN,
+ LifeCycleState.CLOSING,
+ LifeCycleEvent.FINALIZE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.CLOSING,
+ LifeCycleState.QUASI_CLOSED,
+ LifeCycleEvent.QUASI_CLOSE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.CLOSING,
+ LifeCycleState.CLOSED,
+ LifeCycleEvent.CLOSE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.QUASI_CLOSED,
+ LifeCycleState.CLOSED,
+ LifeCycleEvent.FORCE_CLOSE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.CLOSED,
+ LifeCycleState.DELETING,
+ LifeCycleEvent.DELETE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.DELETING,
+ LifeCycleState.DELETED,
+ LifeCycleEvent.CLEANUP);
+
+ return containerLifecycleSM;
+ }
+
+ /**
+ *
+ */
+ private long getConfiguredContainerSize(final Configuration conf) {
+ return (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ }
+
+ /**
+ *
+ */
+ private void initialize() throws IOException {
+ TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
+ iterator = containerStore.iterator();
+
+ while (iterator.hasNext()) {
+ final ContainerInfo container = iterator.next().getValue();
+ Preconditions.checkNotNull(container);
+ containers.addContainer(container);
+ nextContainerID.set(Long.max(container.containerID().getId(),
+ nextContainerID.get()));
+ if (container.getState() == LifeCycleState.OPEN) {
+ try {
+ pipelineManager.addContainerToPipeline(container.getPipelineID(),
+ ContainerID.valueof(container.getContainerID()));
+ } catch (PipelineNotFoundException ex) {
+ LOG.warn("Found container {} which is in OPEN state with " +
+ "pipeline {} that does not exist. Marking container for " +
+ "closing.", container, container.getPipelineID());
+ updateContainerState(container.containerID(),
+ LifeCycleEvent.FINALIZE);
+ }
+ }
+ }
+ }
+
+ @Override
+ public ContainerID getNextContainerID() {
+ return ContainerID.valueof(nextContainerID.get());
+ }
+
+ @Override
+ public Set<ContainerID> getContainerIDs() {
+ return containers.getAllContainerIDs();
+ }
+
+ @Override
+ public Set<ContainerID> getContainerIDs(final LifeCycleState state) {
+ return containers.getContainerIDsByState(state);
+ }
+
+ @Override
+ public ContainerInfo getContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ return containers.getContainerInfo(containerID);
+ }
+
+ @Override
+ public Set<ContainerReplica> getContainerReplicas(
+ final ContainerID containerID) throws ContainerNotFoundException {
+ return containers.getContainerReplicas(containerID);
+ }
+
+ @Override
+ public void addContainer(final ContainerInfoProto containerInfo)
+ throws IOException {
+
+ // Change the exception thrown to PipelineNotFound and
+ // ClosedPipelineException once ClosedPipelineException is introduced
+ // in PipelineManager.
+
+ Preconditions.checkNotNull(containerInfo);
+ final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo);
+ if (getContainer(container.containerID()) == null) {
+ Preconditions.checkArgument(nextContainerID.get()
+ == container.containerID().getId(),
+ "ContainerID mismatch.");
+
+ pipelineManager.addContainerToPipeline(
+ container.getPipelineID(), container.containerID());
+ containers.addContainer(container);
+ nextContainerID.incrementAndGet();
+ }
+ }
+
+ void updateContainerState(final ContainerID containerID,
+ final LifeCycleEvent event)
+ throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+
+ void updateContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException {
+ containers.updateContainerReplica(containerID, replica);
+ }
+
+
+ void updateDeleteTransactionId(
+ final Map<ContainerID, Long> deleteTransactionMap) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ ContainerInfo getMatchingContainer(final long size, String owner,
+ PipelineID pipelineID, NavigableSet<ContainerID> containerIDs) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+
+ NavigableSet<ContainerID> getMatchingContainerIDs(final String owner,
+ final ReplicationType type, final ReplicationFactor factor,
+ final LifeCycleState state) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ void removeContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+
+ void removeContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ void close() throws IOException {
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for ContainerStateManager.
+ */
+ public static class Builder {
+ private Configuration conf;
+ private PipelineManager pipelineMgr;
+ private SCMRatisServer scmRatisServer;
+ private Table<ContainerID, ContainerInfo> table;
+
+ public Builder setConfiguration(final Configuration config) {
+ conf = config;
+ return this;
+ }
+
+ public Builder setPipelineManager(final PipelineManager pipelineManager) {
+ pipelineMgr = pipelineManager;
+ return this;
+ }
+
+ public Builder setRatisServer(final SCMRatisServer ratisServer) {
+ scmRatisServer = ratisServer;
+ return this;
+ }
+
+ public Builder setContainerStore(
+ final Table<ContainerID, ContainerInfo> containerStore) {
+ table = containerStore;
+ return this;
+ }
+
+ public ContainerStateManagerV2 build() throws IOException {
+ Preconditions.checkNotNull(conf);
+ Preconditions.checkNotNull(pipelineMgr);
+ Preconditions.checkNotNull(scmRatisServer);
+ Preconditions.checkNotNull(table);
+
+ final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
+ conf, pipelineMgr, table);
+ scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm);
+
+ final SCMHAInvocationHandler invocationHandler =
+ new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
+ scmRatisServer);
+
+ return (ContainerStateManagerV2) Proxy.newProxyInstance(
+ SCMHAInvocationHandler.class.getClassLoader(),
+ new Class<?>[]{ContainerStateManagerV2.class}, invocationHandler);
+ }
+
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
new file mode 100644
index 0000000..9960354
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ *
+ * TODO: Add proper javadoc.
+ *
+ * Implementation of methods marked with {@code @Replicate} annotation should
be
+ *
+ * 1. Idempotent
+ * 2. Arguments should be of protobuf objects
+ * 3. Return type should be of protobuf object
+ * 4. The declaration should throw RaftException
+ *
+ */
+public interface ContainerStateManagerV2 {
+
+ /**
+ *
+ */
+ ContainerID getNextContainerID();
+
+ /**
+ *
+ */
+ Set<ContainerID> getContainerIDs();
+
+ /**
+ *
+ */
+ Set<ContainerID> getContainerIDs(LifeCycleState state);
+
+ /**
+ *
+ */
+ ContainerInfo getContainer(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ *
+ */
+ Set<ContainerReplica> getContainerReplicas(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ *
+ */
+ @Replicate
+ void addContainer(ContainerInfoProto containerInfo)
+ throws IOException;
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
new file mode 100644
index 0000000..1bc1697
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.Log;
+import static org.apache.ratis.server.RaftServerConfigKeys.RetryCache;
+import static org.apache.ratis.server.RaftServerConfigKeys.Rpc;
+import static org.apache.ratis.server.RaftServerConfigKeys.Snapshot;
+
+/**
+ * Ratis Util for SCM HA.
+ */
+public final class RatisUtil {
+
+ private RatisUtil() {
+ }
+
+
+ /**
+ * Constructs new Raft Properties instance using {@link SCMHAConfiguration}.
+ * @param haConf SCMHAConfiguration
+ * @param conf ConfigurationSource
+ */
+ public static RaftProperties newRaftProperties(
+ final SCMHAConfiguration haConf, final ConfigurationSource conf) {
+ //TODO: Remove ConfigurationSource!
+ // TODO: Check the default values.
+ final RaftProperties properties = new RaftProperties();
+ setRaftStorageDir(properties, haConf, conf);
+ setRaftRpcProperties(properties, haConf);
+ setRaftLogProperties(properties, haConf);
+ setRaftRetryCacheProperties(properties, haConf);
+ setRaftSnapshotProperties(properties, haConf);
+ return properties;
+ }
+
+ /**
+ * Set the local directory where ratis logs will be stored.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param haConf SCMHAConfiguration
+ * @param conf ConfigurationSource
+ */
+ public static void setRaftStorageDir(final RaftProperties properties,
+ final SCMHAConfiguration haConf,
+ final ConfigurationSource conf) {
+ String storageDir = haConf.getRatisStorageDir();
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+ }
+ RaftServerConfigKeys.setStorageDir(properties,
+ Collections.singletonList(new File(storageDir)));
+ }
+
+ /**
+ * Set properties related to Raft RPC.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftRpcProperties(final RaftProperties properties,
+ final SCMHAConfiguration conf) {
+ RaftConfigKeys.Rpc.setType(properties,
+ RpcType.valueOf(conf.getRatisRpcType()));
+ GrpcConfigKeys.Server.setPort(properties,
+ conf.getRatisBindAddress().getPort());
+ GrpcConfigKeys.setMessageSizeMax(properties,
+ SizeInBytes.valueOf("32m"));
+
+ Rpc.setRequestTimeout(properties, TimeDuration.valueOf(
+ conf.getRatisRequestTimeout(), TimeUnit.MILLISECONDS));
+ Rpc.setTimeoutMin(properties, TimeDuration.valueOf(
+ conf.getRatisRequestMinTimeout(), TimeUnit.MILLISECONDS));
+ Rpc.setTimeoutMax(properties, TimeDuration.valueOf(
+ conf.getRatisRequestMaxTimeout(), TimeUnit.MILLISECONDS));
+ Rpc.setSlownessTimeout(properties, TimeDuration.valueOf(
+ conf.getRatisNodeFailureTimeout(), TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Set properties related to Raft Log.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftLogProperties(final RaftProperties properties,
+ final SCMHAConfiguration conf) {
+ Log.setSegmentSizeMax(properties,
+ SizeInBytes.valueOf(conf.getRaftSegmentSize()));
+ Log.Appender.setBufferElementLimit(properties,
+ conf.getRaftLogAppenderQueueByteLimit());
+ Log.Appender.setBufferByteLimit(properties,
+ SizeInBytes.valueOf(conf.getRaftLogAppenderQueueByteLimit()));
+ Log.setPreallocatedSize(properties,
+ SizeInBytes.valueOf(conf.getRaftSegmentPreAllocatedSize()));
+ Log.Appender.setInstallSnapshotEnabled(properties, false);
+ Log.setPurgeGap(properties, conf.getRaftLogPurgeGap());
+ Log.setSegmentCacheNumMax(properties, 2);
+ }
+
+ /**
+ * Set properties related to Raft Retry Cache.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftRetryCacheProperties(
+ final RaftProperties properties, final SCMHAConfiguration conf) {
+ RetryCache.setExpiryTime(properties, TimeDuration.valueOf(
+ conf.getRatisRetryCacheTimeout(), TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Set properties related to Raft Snapshot.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftSnapshotProperties(
+ final RaftProperties properties, final SCMHAConfiguration conf) {
+ Snapshot.setAutoTriggerEnabled(properties, true);
+ Snapshot.setAutoTriggerThreshold(properties, 400000);
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
new file mode 100644
index 0000000..7c54723
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Reflection util for SCM HA.
+ */
+public final class ReflectionUtil {
+
+ private static Map<String, Class<?>> classCache = new HashMap<>();
+
+ private ReflectionUtil() {
+ }
+
+ /**
+ * Returns the {@code Class} object associated with the given string name.
+ *
+ * @param className the fully qualified name of the desired class.
+ * @return the {@code Class} object for the class with the
+ * specified name.
+ * @throws ClassNotFoundException if the class cannot be located
+ */
+ public static Class<?> getClass(String className)
+ throws ClassNotFoundException {
+ if (!classCache.containsKey(className)) {
+ classCache.put(className, Class.forName(className));
+ }
+ return classCache.get(className);
+ }
+
+ /**
+ * Returns a {@code Method} object that reflects the specified public
+ * member method of the given {@code Class} object.
+ *
+ * @param clazz the class object which has the method
+ * @param methodName the name of the method
+ * @param arg the list of parameters
+ * @return the {@code Method} object that matches the specified
+ * {@code name} and {@code parameterTypes}
+ * @throws NoSuchMethodException if a matching method is not found
+ * or if the name is "<init>"or "<clinit>".
+ */
+ public static Method getMethod(
+ final Class<?> clazz, final String methodName, final Class<?>... arg)
+ throws NoSuchMethodException {
+ return clazz.getMethod(methodName, arg);
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
new file mode 100644
index 0000000..1cb8c65
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.net.NetUtils;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.HA;
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+/**
+ * Configuration used by SCM HA.
+ */
+@ConfigGroup(prefix = "ozone.scm.ha")
+public class SCMHAConfiguration {
+
+ @Config(key = "ratis.storage.dir",
+ type = ConfigType.STRING,
+ defaultValue = "",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Storage directory used by SCM to write Ratis logs."
+ )
+ private String ratisStorageDir;
+
+ @Config(key = "ratis.bind.host",
+ type = ConfigType.STRING,
+ defaultValue = "0.0.0.0",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Host used by SCM for binding Ratis Server."
+ )
+ private String ratisBindHost = "0.0.0.0";
+
+ @Config(key = "ratis.bind.port",
+ type = ConfigType.STRING,
+ defaultValue = "9865",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Port used by SCM for Ratis Server."
+ )
+ private int ratisBindPort = 9865;
+
+
+ @Config(key = "ratis.rpc.type",
+ type = ConfigType.STRING,
+ defaultValue = "GRPC",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Ratis supports different kinds of transports like" +
+ " netty, GRPC, Hadoop RPC etc. This picks one of those for" +
+ " this cluster."
+ )
+ private String ratisRpcType;
+
+ @Config(key = "ratis.segment.size",
+ type = ConfigType.SIZE,
+ defaultValue = "16KB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The size of the raft segment used by Apache Ratis on" +
+ " SCM. (16 KB by default)"
+ )
+ private long raftSegmentSize = 16L * 1024L;
+
+ @Config(key = "ratis.segment.preallocated.size",
+ type = ConfigType.SIZE,
+ defaultValue = "16KB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The size of the buffer which is preallocated for" +
+ " raft segment used by Apache Ratis on SCM.(16 KB by default)"
+ )
+ private long raftSegmentPreAllocatedSize = 16 * 1024;
+
+ @Config(key = "ratis.log.appender.queue.num-elements",
+ type = ConfigType.INT,
+ defaultValue = "1024",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Number of operation pending with Raft's Log Worker."
+ )
+ private int raftLogAppenderQueueNum = 1024;
+
+ @Config(key = "ratis.log.appender.queue.byte-limit",
+ type = ConfigType.SIZE,
+ defaultValue = "32MB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Byte limit for Raft's Log Worker queue."
+ )
+ private int raftLogAppenderQueueByteLimit = 32 * 1024 * 1024;
+
+ @Config(key = "ratis.log.purge.gap",
+ type = ConfigType.INT,
+ defaultValue = "1000000",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The minimum gap between log indices for Raft server to" +
+ " purge its log segments after taking snapshot."
+ )
+ private int raftLogPurgeGap = 1000000;
+
+ @Config(key = "ratis.request.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "3000ms",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The timeout duration for SCM's Ratis server RPC."
+ )
+ private long ratisRequestTimeout = 3000L;
+
+ @Config(key = "ratis.server.retry.cache.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "60s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Retry Cache entry timeout for SCM's ratis server."
+ )
+ private long ratisRetryCacheTimeout = 60 * 1000L;
+
+
+ @Config(key = "ratis.leader.election.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "1s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The minimum timeout duration for SCM ratis leader" +
+ " election. Default is 1s."
+ )
+ private long ratisLeaderElectionTimeout = 1 * 1000L;
+
+ @Config(key = "ratis.server.failure.timeout.duration",
+ type = ConfigType.TIME,
+ defaultValue = "120s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The timeout duration for ratis server failure" +
+ " detection, once the threshold has reached, the ratis state" +
+ " machine will be informed about the failure in the ratis ring."
+ )
+ private long ratisNodeFailureTimeout = 120 * 1000L;
+
+ @Config(key = "ratis.server.role.check.interval",
+ type = ConfigType.TIME,
+ defaultValue = "15s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The interval between SCM leader performing a role" +
+ " check on its ratis server. Ratis server informs SCM if it loses" +
+ " the leader role. The scheduled check is an secondary check to" +
+ " ensure that the leader role is updated periodically"
+ )
+ private long ratisRoleCheckerInterval = 15 * 1000L;
+
+ public String getRatisStorageDir() {
+ return ratisStorageDir;
+ }
+
+ public InetSocketAddress getRatisBindAddress() {
+ return NetUtils.createSocketAddr(ratisBindHost, ratisBindPort);
+ }
+
+ public String getRatisRpcType() {
+ return ratisRpcType;
+ }
+
+ public long getRaftSegmentSize() {
+ return raftSegmentSize;
+ }
+
+ public long getRaftSegmentPreAllocatedSize() {
+ return raftSegmentPreAllocatedSize;
+ }
+
+ public int getRaftLogAppenderQueueNum() {
+ return raftLogAppenderQueueNum;
+ }
+
+ public int getRaftLogAppenderQueueByteLimit() {
+ return raftLogAppenderQueueByteLimit;
+ }
+
+ public int getRaftLogPurgeGap() {
+ return raftLogPurgeGap;
+ }
+
+ public long getRatisRetryCacheTimeout() {
+ return ratisRetryCacheTimeout;
+ }
+
+ public long getRatisRequestTimeout() {
+ Preconditions.checkArgument(ratisRequestTimeout > 1000L,
+ "Ratis request timeout cannot be less than 1000ms.");
+ return ratisRequestTimeout;
+ }
+
+ public long getRatisRequestMinTimeout() {
+ return ratisRequestTimeout - 1000L;
+ }
+
+ public long getRatisRequestMaxTimeout() {
+ return ratisRequestTimeout + 1000L;
+ }
+
+ public long getRatisLeaderElectionTimeout() {
+ return ratisLeaderElectionTimeout;
+ }
+
+ public long getRatisNodeFailureTimeout() {
+ return ratisNodeFailureTimeout;
+ }
+
+ public long getRatisRoleCheckerInterval() {
+ return ratisRoleCheckerInterval;
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
new file mode 100644
index 0000000..c78c616
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InvocationHandler which checks for {@link Replicate} annotation and
+ * dispatches the request to Ratis Server.
+ */
+public class SCMHAInvocationHandler implements InvocationHandler {
+
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SCMHAInvocationHandler.class);
+
+ private final RequestType requestType;
+ private final Object localHandler;
+ private final SCMRatisServer ratisHandler;
+
+ /**
+ * TODO.
+ */
+ public SCMHAInvocationHandler(final RequestType requestType,
+ final Object localHandler,
+ final SCMRatisServer ratisHandler) {
+ this.requestType = requestType;
+ this.localHandler = localHandler;
+ this.ratisHandler = ratisHandler;
+ }
+
+ @Override
+ public Object invoke(final Object proxy, final Method method,
+ final Object[] args) throws Throwable {
+ try {
+ long startTime = Time.monotonicNow();
+ final Object result = method.isAnnotationPresent(Replicate.class) ?
+ invokeRatis(method, args) : invokeLocal(method, args);
+ LOG.debug("Call: {} took {} ms", method, Time.monotonicNow() -
startTime);
+ return result;
+ } catch(InvocationTargetException iEx) {
+ throw iEx.getCause();
+ }
+ }
+
+ /**
+ * TODO.
+ */
+ private Object invokeLocal(Method method, Object[] args)
+ throws InvocationTargetException, IllegalAccessException {
+ LOG.trace("Invoking method {} on target {}", method, localHandler);
+ return method.invoke(method, args);
+ }
+
+ /**
+ * TODO.
+ */
+ private Object invokeRatis(Method method, Object[] args)
+ throws Exception {
+ LOG.trace("Invoking method {} on target {}", method, ratisHandler);
+ final SCMRatisResponse response = ratisHandler.submitRequest(
+ SCMRatisRequest.of(requestType, method.getName(), args));
+ if (response.isSuccess()) {
+ return response.getResult();
+ }
+ // Should we unwrap and throw proper exception from here?
+ throw response.getException();
+ }
+
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
new file mode 100644
index 0000000..b38fc43
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+
+import java.io.IOException;
+
+/**
+ * SCMHAManager provides HA service for SCM.
+ *
+ * It uses Apache Ratis for HA implementation. We will have a 2N+1
+ * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
+ * nodes.
+ *
+ * TODO
+ *
+ */
+public class SCMHAManager {
+
+ private static boolean isLeader = true;
+
+ private final SCMRatisServer ratisServer;
+
+ /**
+ * Creates SCMHAManager instance.
+ */
+ public SCMHAManager(final ConfigurationSource conf) throws IOException {
+ this.ratisServer = new SCMRatisServer(
+ conf.getObject(SCMHAConfiguration.class), conf);
+ }
+
+ /**
+ * Starts HA service.
+ */
+ public void start() throws IOException {
+ ratisServer.start();
+ }
+
+ /**
+ * Returns true if the current SCM is the leader.
+ */
+ public static boolean isLeader() {
+ return isLeader;
+ }
+
+ /**
+ * Returns RatisServer instance associated with the SCM instance.
+ */
+ public SCMRatisServer getRatisServer() {
+ return ratisServer;
+ }
+
+ /**
+ * Stops the HA service.
+ */
+ public void shutdown() throws IOException {
+ ratisServer.stop();
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
new file mode 100644
index 0000000..d65c235
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ProtocolMessageEnum;
+
+import org.apache.ratis.protocol.Message;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.Method;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.MethodArgument;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import
org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisRequestProto;
+
+
+/**
+ * Represents the request that is sent to RatisServer.
+ */
+public final class SCMRatisRequest {
+
+ private final RequestType type;
+ private final String operation;
+ private final Object[] arguments;
+
+ private SCMRatisRequest(final RequestType type, final String operation,
+ final Object... arguments) {
+ this.type = type;
+ this.operation = operation;
+ this.arguments = arguments;
+ }
+
+ public static SCMRatisRequest of(final RequestType type,
+ final String operation,
+ final Object... arguments) {
+ return new SCMRatisRequest(type, operation, arguments);
+ }
+
+ /**
+ * Returns the type of request.
+ */
+ public RequestType getType() {
+ return type;
+ }
+
+ /**
+ * Returns the operation that this request represents.
+ */
+ public String getOperation() {
+ return operation;
+ }
+
+ /**
+ * Returns the arguments encoded in the request.
+ */
+ public Object[] getArguments() {
+ return arguments.clone();
+ }
+
+ /**
+ * Encodes the request into Ratis Message.
+ */
+ public Message encode() throws InvalidProtocolBufferException {
+ final SCMRatisRequestProto.Builder requestProtoBuilder =
+ SCMRatisRequestProto.newBuilder();
+ requestProtoBuilder.setType(type);
+
+ final Method.Builder methodBuilder = Method.newBuilder();
+ methodBuilder.setName(operation);
+
+ final List<MethodArgument> args = new ArrayList<>();
+ for (Object argument : arguments) {
+ final MethodArgument.Builder argBuilder = MethodArgument.newBuilder();
+ argBuilder.setType(argument.getClass().getCanonicalName());
+ if (argument instanceof GeneratedMessage) {
+ argBuilder.setValue(((GeneratedMessage) argument).toByteString());
+ } else if (argument instanceof ProtocolMessageEnum) {
+ argBuilder.setValue(ByteString.copyFrom(Ints.toByteArray(
+ ((ProtocolMessageEnum) argument).getNumber())));
+ } else {
+ throw new InvalidProtocolBufferException(argument.getClass() +
+ " is not a protobuf object!");
+ }
+ args.add(argBuilder.build());
+ }
+ methodBuilder.addAllArgs(args);
+ return Message.valueOf(
+ org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom(
+ requestProtoBuilder.build().toByteArray()));
+ }
+
+ /**
+ * Decodes the request from Ratis Message.
+ */
+ public static SCMRatisRequest decode(Message message)
+ throws InvalidProtocolBufferException {
+ final SCMRatisRequestProto requestProto =
+ SCMRatisRequestProto.parseFrom(message.getContent().toByteArray());
+ final Method method = requestProto.getMethod();
+ List<Object> args = new ArrayList<>();
+ for (MethodArgument argument : method.getArgsList()) {
+ try {
+ final Class<?> clazz = ReflectionUtil.getClass(argument.getType());
+ if (GeneratedMessage.class.isAssignableFrom(clazz)) {
+ args.add(ReflectionUtil.getMethod(clazz, "parseFrom", byte[].class)
+ .invoke(null, (Object) argument.getValue().toByteArray()));
+ } else if (Enum.class.isAssignableFrom(clazz)) {
+ args.add(ReflectionUtil.getMethod(clazz, "valueOf", int.class)
+ .invoke(null, Ints.fromByteArray(
+ argument.getValue().toByteArray())));
+ } else {
+ throw new InvalidProtocolBufferException(argument.getType() +
+ " is not a protobuf object!");
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException |
+ IllegalAccessException | InvocationTargetException ex) {
+ throw new InvalidProtocolBufferException(argument.getType() +
+ " cannot be decoded!" + ex.getMessage());
+ }
+ }
+ return new SCMRatisRequest(requestProto.getType(),
+ method.getName(), args.toArray());
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
new file mode 100644
index 0000000..c4bedcc
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigInteger;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ProtocolMessageEnum;
+import
org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisResponseProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+
+/**
+ * Represents the response from RatisServer.
+ */
+public final class SCMRatisResponse {
+
+ private final boolean success;
+ private final Object result;
+ private final Exception exception;
+
+ private SCMRatisResponse(final Object result) {
+ this(true, result, null);
+ }
+
+ private SCMRatisResponse(final Exception exception) {
+ this(false, null, exception);
+ }
+
+ private SCMRatisResponse(final boolean success, final Object result,
+ final Exception exception) {
+ this.success = success;
+ this.result = result;
+ this.exception = exception;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public Object getResult() {
+ return result;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public static Message encode(final Object result)
+ throws InvalidProtocolBufferException {
+
+ final ByteString value;
+ if (result instanceof GeneratedMessage) {
+ value = ((GeneratedMessage) result).toByteString();
+ } else if (result instanceof ProtocolMessageEnum) {
+ value = ByteString.copyFrom(BigInteger.valueOf(
+ ((ProtocolMessageEnum) result).getNumber()).toByteArray());
+ } else {
+ throw new InvalidProtocolBufferException(result.getClass() +
+ " is not a protobuf object!");
+ }
+
+ final SCMRatisResponseProto response =
+ SCMRatisResponseProto.newBuilder()
+ .setType(result.getClass().getCanonicalName())
+ .setValue(value)
+ .build();
+ return Message.valueOf(
+ org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom(
+ response.toByteArray()));
+ }
+
+ public static SCMRatisResponse decode(RaftClientReply reply)
+ throws InvalidProtocolBufferException {
+ return reply.isSuccess() ?
+ new SCMRatisResponse(
+ deserializeResult(reply.getMessage().getContent().toByteArray())) :
+ new SCMRatisResponse(reply.getException());
+ }
+
+ private static Object deserializeResult(byte[] response)
+ throws InvalidProtocolBufferException {
+ final SCMRatisResponseProto responseProto =
+ SCMRatisResponseProto.parseFrom(response);
+ try {
+ final Class<?> clazz = ReflectionUtil.getClass(responseProto.getType());
+ if (GeneratedMessage.class.isAssignableFrom(clazz)) {
+ return ReflectionUtil.getMethod(clazz, "parseFrom", byte[].class)
+ .invoke(null, (Object) responseProto.getValue().toByteArray());
+ }
+
+ if (Enum.class.isAssignableFrom(clazz)) {
+ return ReflectionUtil.getMethod(clazz, "valueOf", int.class)
+ .invoke(null, new BigInteger(
+ responseProto.getValue().toByteArray()).intValue());
+ }
+
+ throw new InvalidProtocolBufferException(responseProto.getType() +
+ " is not a protobuf object!");
+
+ } catch (ClassNotFoundException | NoSuchMethodException |
+ IllegalAccessException | InvocationTargetException ex) {
+ throw new InvalidProtocolBufferException(responseProto.getType() +
+ " cannot be decoded!" + ex.getMessage());
+ }
+
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
new file mode 100644
index 0000000..209535d
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+
+/**
+ * TODO.
+ */
+public class SCMRatisServer {
+
+ private final InetSocketAddress address;
+ private final RaftServer server;
+ private final RaftGroupId raftGroupId;
+ private final RaftGroup raftGroup;
+ private final RaftPeerId raftPeerId;
+ private final SCMStateMachine scmStateMachine;
+ private final ClientId clientId = ClientId.randomId();
+ private final AtomicLong callId = new AtomicLong();
+
+
+ // TODO: Refactor and remove ConfigurationSource and use only
+ // SCMHAConfiguration.
+ SCMRatisServer(final SCMHAConfiguration haConf,
+ final ConfigurationSource conf)
+ throws IOException {
+ final String scmServiceId = "SCM-HA-Service";
+ final String scmNodeId = "localhost";
+ this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
+ this.address = haConf.getRatisBindAddress();
+ final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address);
+ final List<RaftPeer> raftPeers = new ArrayList<>();
+ raftPeers.add(localRaftPeer);
+ final RaftProperties serverProperties = RatisUtil
+ .newRaftProperties(haConf, conf);
+ this.raftGroupId = RaftGroupId.valueOf(
+ UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8)));
+ this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+ this.scmStateMachine = new SCMStateMachine();
+ this.server = RaftServer.newBuilder()
+ .setServerId(raftPeerId)
+ .setGroup(raftGroup)
+ .setProperties(serverProperties)
+ .setStateMachine(scmStateMachine)
+ .build();
+ }
+
+ void start() throws IOException {
+ server.start();
+ }
+
+ public void registerStateMachineHandler(final RequestType handlerType,
+ final Object handler) {
+ scmStateMachine.registerHandler(handlerType, handler);
+ }
+
+ SCMRatisResponse submitRequest(SCMRatisRequest request)
+ throws IOException, ExecutionException, InterruptedException {
+ final RaftClientRequest raftClientRequest = new RaftClientRequest(
+ clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
+ RaftClientRequest.writeRequestType(), null);
+ final RaftClientReply raftClientReply =
+ server.submitClientRequestAsync(raftClientRequest).get();
+ return SCMRatisResponse.decode(raftClientReply);
+ }
+
+ private long nextCallId() {
+ return callId.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ void stop() throws IOException {
+ server.close();
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
new file mode 100644
index 0000000..b10dd54
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+
+/**
+ * TODO.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+
+ private final Map<RequestType, Object> handlers;
+
+ public SCMStateMachine() {
+ this.handlers = new EnumMap<>(RequestType.class);
+ }
+
+ public void registerHandler(RequestType type, Object handler) {
+ handlers.put(type, handler);
+ }
+
+ @Override
+ public CompletableFuture<Message> applyTransaction(
+ final TransactionContext trx) {
+ final CompletableFuture<Message> applyTransactionFuture =
+ new CompletableFuture<>();
+ try {
+ final SCMRatisRequest request = SCMRatisRequest.decode(
+ trx.getClientRequest().getMessage());
+ applyTransactionFuture.complete(process(request));
+ } catch (Exception ex) {
+ applyTransactionFuture.completeExceptionally(ex);
+ }
+ return applyTransactionFuture;
+ }
+
+ private Message process(final SCMRatisRequest request)
+ throws Exception {
+ try {
+ final Object handler = handlers.get(request.getType());
+
+ if (handler == null) {
+ throw new IOException("No handler found for request type " +
+ request.getType());
+ }
+
+ final List<Class<?>> argumentTypes = new ArrayList<>();
+ for(Object args : request.getArguments()) {
+ argumentTypes.add(args.getClass());
+ }
+ final Object result = handler.getClass().getMethod(
+ request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
+ .invoke(handler, request.getArguments());
+
+ return SCMRatisResponse.encode(result);
+ } catch (NoSuchMethodException | SecurityException ex) {
+ throw new InvalidProtocolBufferException(ex.getMessage());
+ } catch (InvocationTargetException e) {
+ final Exception targetEx = (Exception) e.getTargetException();
+ throw targetEx != null ? targetEx : e;
+ }
+ }
+
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java
new file mode 100644
index 0000000..aeed57c
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Add javadoc.
+ */
+@Inherited
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Replicate {
+}
diff --git a/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto
b/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto
new file mode 100644
index 0000000..1107016
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+option java_outer_classname = "SCMRatisProtocol";
+option java_generate_equals_and_hash = true;
+
+enum RequestType {
+ PIPELINE = 1;
+ CONTAINER = 2;
+}
+
+message Method {
+ required string name = 1;
+ repeated MethodArgument args = 2;
+}
+
+message MethodArgument {
+ required string type = 1;
+ required bytes value = 2;
+}
+
+message SCMRatisRequestProto {
+ required RequestType type = 1;
+ required Method method = 2;
+}
+
+message SCMRatisResponseProto {
+ required string type = 2;
+ required bytes value = 3;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]