This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdc36fe  HDDS-1649. On installSnapshot notification from OM leader, 
download checkpoint and reload OM state (#948)
cdc36fe is described below

commit cdc36fe286708b5ff12675599da8c7650744f064
Author: Hanisha Koneru <hanishakon...@apache.org>
AuthorDate: Mon Jul 22 12:06:55 2019 -0700

    HDDS-1649. On installSnapshot notification from OM leader, download 
checkpoint and reload OM state (#948)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../common/src/main/resources/ozone-default.xml    |   8 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   3 +
 .../hadoop/ozone/om/exceptions/OMException.java    |   3 +-
 .../ozone/om/protocol/OzoneManagerHAProtocol.java  |   3 +-
 .../src/main/proto/OzoneManagerProtocol.proto      |   2 +
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   6 +
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |  49 ++-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 189 +++++++++++
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |   7 +-
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     |   2 +-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |   9 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 359 ++++++++++++++++-----
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  15 +-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  81 ++++-
 .../om/snapshot/OzoneManagerSnapshotProvider.java  |   2 +-
 16 files changed, 637 insertions(+), 102 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index d28e477..67bd22d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -119,6 +119,7 @@ public final class OzoneConsts {
   public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
   public static final String DELETED_BLOCK_DB = "deletedBlock.db";
   public static final String OM_DB_NAME = "om.db";
+  public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
   public static final String OM_DB_CHECKPOINTS_DIR_NAME = "om.db.checkpoints";
   public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
   public static final String SCM_DB_NAME = "scm.db";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 30cf386..b2f820b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1630,6 +1630,14 @@
     <description>Byte limit for Raft's Log Worker queue.
     </description>
   </property>
+  <property>
+    <name>ozone.om.ratis.log.purge.gap</name>
+    <value>1000000</value>
+    <tag>OZONE, OM, RATIS</tag>
+    <description>The minimum gap between log indices for Raft server to purge
+      its log segments after taking snapshot.
+    </description>
+  </property>
 
   <property>
     <name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 14b6783..35431fa 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -123,6 +123,9 @@ public final class OMConfigKeys {
       "ozone.om.ratis.log.appender.queue.byte-limit";
   public static final String
       OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
+  public static final String OZONE_OM_RATIS_LOG_PURGE_GAP =
+      "ozone.om.ratis.log.purge.gap";
+  public static final int OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000;
 
   // OM Snapshot configurations
   public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 66ce1cc..78bdb21 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -203,7 +203,8 @@ public class OMException extends IOException {
 
     PREFIX_NOT_FOUND,
 
-    S3_BUCKET_INVALID_LENGTH
+    S3_BUCKET_INVALID_LENGTH,
 
+    RATIS_ERROR // Error in Ratis server
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index 1434dca..675c814 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -29,9 +29,10 @@ public interface OzoneManagerHAProtocol {
   /**
    * Store the snapshot index i.e. the raft log index, corresponding to the
    * last transaction applied to the OM RocksDB, in OM metadata dir on disk.
+   * @param flush flush the OM DB to disk if true
    * @return the snapshot index
    * @throws IOException
    */
-  long saveRatisSnapshot() throws IOException;
+  long saveRatisSnapshot(boolean flush) throws IOException;
 
 }
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto 
b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 7007d98..5dd2b55 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -278,6 +278,8 @@ enum Status {
     PREFIX_NOT_FOUND=50;
 
     S3_BUCKET_INVALID_LENGTH = 51; // s3 bucket invalid length.
+
+    RATIS_ERROR = 52;
 }
 
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 4927da1..1139a65 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -241,6 +241,7 @@ public interface MiniOzoneCluster {
     protected String clusterId;
     protected String omServiceId;
     protected int numOfOMs;
+    protected int numOfActiveOMs;
 
     protected Optional<Boolean> enableTrace = Optional.of(false);
     protected Optional<Integer> hbInterval = Optional.empty();
@@ -440,6 +441,11 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    public Builder setNumOfActiveOMs(int numActiveOMs) {
+      this.numOfActiveOMs = numActiveOMs;
+      return this;
+    }
+
     public Builder setStreamBufferSizeUnit(StorageUnit unit) {
       this.streamBufferSizeUnit = Optional.of(unit);
       return this;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 4d00710..1d9a99e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -53,6 +53,10 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
   private Map<String, OzoneManager> ozoneManagerMap;
   private List<OzoneManager> ozoneManagers;
 
+  // Active OMs denote OMs which are up and running
+  private List<OzoneManager> activeOMs;
+  private List<OzoneManager> inactiveOMs;
+
   private static final Random RANDOM = new Random();
   private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds
 
@@ -67,11 +71,15 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
   private MiniOzoneHAClusterImpl(
       OzoneConfiguration conf,
       Map<String, OzoneManager> omMap,
+      List<OzoneManager> activeOMList,
+      List<OzoneManager> inactiveOMList,
       StorageContainerManager scm,
       List<HddsDatanodeService> hddsDatanodes) {
     super(conf, scm, hddsDatanodes);
     this.ozoneManagerMap = omMap;
     this.ozoneManagers = new ArrayList<>(omMap.values());
+    this.activeOMs = activeOMList;
+    this.inactiveOMs = inactiveOMList;
   }
 
   /**
@@ -83,6 +91,10 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
     return this.ozoneManagers.get(0);
   }
 
+  public boolean isOMActive(String omNodeId) {
+    return activeOMs.contains(ozoneManagerMap.get(omNodeId));
+  }
+
   public OzoneManager getOzoneManager(int index) {
     return this.ozoneManagers.get(index);
   }
@@ -91,6 +103,20 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
     return this.ozoneManagerMap.get(omNodeId);
   }
 
+  /**
+   * Start a previously inactive OM.
+   */
+  public void startInactiveOM(String omNodeID) throws IOException {
+    OzoneManager ozoneManager = ozoneManagerMap.get(omNodeID);
+    if (!inactiveOMs.contains(ozoneManager)) {
+      throw new IOException("OM is already active.");
+    } else {
+      ozoneManager.start();
+      activeOMs.add(ozoneManager);
+      inactiveOMs.remove(ozoneManager);
+    }
+  }
+
   @Override
   public void restartOzoneManager() throws IOException {
     for (OzoneManager ozoneManager : ozoneManagers) {
@@ -125,6 +151,8 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
   public static class Builder extends MiniOzoneClusterImpl.Builder {
 
     private final String nodeIdBaseStr = "omNode-";
+    private List<OzoneManager> activeOMs = new ArrayList<>();
+    private List<OzoneManager> inactiveOMs = new ArrayList<>();
 
     /**
      * Creates a new Builder.
@@ -137,6 +165,10 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
 
     @Override
     public MiniOzoneCluster build() throws IOException {
+      if (numOfActiveOMs > numOfOMs) {
+        throw new IllegalArgumentException("Number of active OMs cannot be " +
+            "more than the total number of OMs");
+      }
       DefaultMetricsSystem.setMiniClusterMode(true);
       initializeConfiguration();
       StorageContainerManager scm;
@@ -150,8 +182,8 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
       }
 
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
-      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
-          scm, hddsDatanodes);
+      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(
+          conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes);
       if (startDataNodes) {
         cluster.startHddsDatanodes();
       }
@@ -215,9 +247,16 @@ public final class MiniOzoneHAClusterImpl extends 
MiniOzoneClusterImpl {
             om.setCertClient(certClient);
             omMap.put(nodeId, om);
 
-            om.start();
-            LOG.info("Started OzoneManager RPC server at " +
-                om.getOmRpcServerAddr());
+            if (i <= numOfActiveOMs) {
+              om.start();
+              activeOMs.add(om);
+              LOG.info("Started OzoneManager RPC server at " +
+                  om.getOmRpcServerAddr());
+            } else {
+              inactiveOMs.add(om);
+              LOG.info("Intialized OzoneManager at " + om.getOmRpcServerAddr()
+                  + ". This OM is currently inactive (not running).");
+            }
           }
 
           // Set default OM address to point to the first OM. Clients would
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
new file mode 100644
index 0000000..6ac28c3
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.om.TestOzoneManagerHA.createKey;
+
+/**
+ * Tests the Ratis snaphsots feature in OM.
+ */
+public class TestOMRatisSnapshots {
+
+  private MiniOzoneHAClusterImpl cluster = null;
+  private ObjectStore objectStore;
+  private OzoneConfiguration conf;
+  private String clusterId;
+  private String scmId;
+  private int numOfOMs = 3;
+  private static final long SNAPSHOT_THRESHOLD = 50;
+  private static final int LOG_PURGE_GAP = 50;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Rule
+  public Timeout timeout = new Timeout(500_000);
+
+  /**
+   * Create a MiniOzoneCluster for testing. The cluster initially has one
+   * inactive OM. So at the start of the cluster, there will be 2 active and 1
+   * inactive OM.
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    conf.setLong(
+        OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+        SNAPSHOT_THRESHOLD);
+    conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
+    cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOMServiceId("om-service-test1")
+        .setNumOfOzoneManagers(numOfOMs)
+        .setNumOfActiveOMs(2)
+        .build();
+    cluster.waitForClusterToBeReady();
+    objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testInstallSnapshot() throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
+        .getCurrentProxyOMNodeId();
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Do some transactions so that the log index increases
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    objectStore.createVolume(volumeName, createVolumeArgs);
+    OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+    retVolumeinfo.createBucket(bucketName);
+    OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+    long leaderOMappliedLogIndex =
+        leaderRatisServer.getStateMachineLastAppliedIndex();
+    leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex();
+
+    List<String> keys = new ArrayList<>();
+    while (leaderOMappliedLogIndex < 2000) {
+      keys.add(createKey(ozoneBucket));
+      leaderOMappliedLogIndex =
+          leaderRatisServer.getStateMachineLastAppliedIndex();
+    }
+
+    // Get the latest db checkpoint from the leader OM.
+    long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true);
+    DBCheckpoint leaderDbCheckpoint =
+        leaderOM.getMetadataManager().getStore().getCheckpoint(false);
+
+    // Start the inactive OM
+    cluster.startInactiveOM(followerNodeId);
+
+    // The recently started OM should be lagging behind the leader OM.
+    long followerOMLastAppliedIndex =
+        followerOM.getOmRatisServer().getStateMachineLastAppliedIndex();
+    Assert.assertTrue(
+        followerOMLastAppliedIndex < leaderOMSnaphsotIndex);
+
+    // Install leader OM's db checkpoint on the lagging OM.
+    followerOM.getOmRatisServer().getOmStateMachine().pause();
+    followerOM.getMetadataManager().getStore().close();
+    followerOM.replaceOMDBWithCheckpoint(
+        leaderOMSnaphsotIndex, leaderDbCheckpoint.getCheckpointLocation());
+
+    // Reload the follower OM with new DB checkpoint from the leader OM.
+    followerOM.reloadOMState(leaderOMSnaphsotIndex);
+    followerOM.getOmRatisServer().getOmStateMachine().unpause(
+        leaderOMSnaphsotIndex);
+
+    // After the new checkpoint is loaded and state machine is unpaused, the
+    // follower OM lastAppliedIndex must match the snapshot index of the
+    // checkpoint.
+    followerOMLastAppliedIndex = followerOM.getOmRatisServer()
+        .getStateMachineLastAppliedIndex();
+    Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex);
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+    Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+        followerOMMetaMngr.getVolumeKey(volumeName)));
+    Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
+        followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+    for (String key : keys) {
+      Assert.assertNotNull(followerOMMetaMngr.getKeyTable().get(
+          followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 05c53b3..92fc263 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -829,7 +829,11 @@ public class TestOzoneManagerHA {
 
   }
 
-  private void createKey(OzoneBucket ozoneBucket) throws IOException {
+  /**
+   * Create a key in the bucket.
+   * @return the key name.
+   */
+  static String createKey(OzoneBucket ozoneBucket) throws IOException {
     String keyName = "key" + RandomStringUtils.randomNumeric(5);
     String data = "data" + RandomStringUtils.randomNumeric(5);
     OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
@@ -837,5 +841,6 @@ public class TestOzoneManagerHA {
         ReplicationFactor.ONE, new HashMap<>());
     ozoneOutputStream.write(data.getBytes(), 0, data.length());
     ozoneOutputStream.close();
+    return keyName;
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index d54e121..b36a128 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -126,7 +126,7 @@ public class OMDBCheckpointServlet extends HttpServlet {
         // ratis snapshot first. This step also included flushing the OM DB.
         // Hence, we can set flush to false.
         flush = false;
-        ratisSnapshotIndex = om.saveRatisSnapshot();
+        ratisSnapshotIndex = om.saveRatisSnapshot(true);
       } else {
         ratisSnapshotIndex = om.loadRatisSnapshotIndex();
       }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 46fdabd..dbadf68 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -196,15 +196,18 @@ public class OMMetrics {
   }
 
   public void setNumVolumes(long val) {
-    this.numVolumes.incr(val);
+    long oldVal = this.numVolumes.value();
+    this.numVolumes.incr(val - oldVal);
   }
 
   public void setNumBuckets(long val) {
-    this.numBuckets.incr(val);
+    long oldVal = this.numBuckets.value();
+    this.numBuckets.incr(val - oldVal);
   }
 
   public void setNumKeys(long val) {
-    this.numKeys.incr(val);
+    long oldVal = this.numKeys.value();
+    this.numKeys.incr(val- oldVal);
   }
 
   public long getNumVolumes() {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 4312516..0267350 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
 
 import java.net.InetAddress;
+import java.nio.file.Path;
 import java.security.PrivateKey;
 import java.security.PublicKey;
 import java.security.KeyPair;
@@ -143,6 +144,10 @@ import org.apache.hadoop.util.KMSUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.utils.RetriableTask;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
@@ -236,18 +241,20 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private RPC.Server omRpcServer;
   private InetSocketAddress omRpcAddress;
   private String omId;
-  private final OMMetadataManager metadataManager;
-  private final VolumeManager volumeManager;
-  private final BucketManager bucketManager;
-  private final KeyManager keyManager;
-  private final PrefixManagerImpl prefixManager;
+
+  private OMMetadataManager metadataManager;
+  private VolumeManager volumeManager;
+  private BucketManager bucketManager;
+  private KeyManager keyManager;
+  private PrefixManagerImpl prefixManager;
+  private S3BucketManager s3BucketManager;
+
   private final OMMetrics metrics;
   private OzoneManagerHttpServer httpServer;
   private final OMStorage omStorage;
   private final ScmBlockLocationProtocol scmBlockClient;
   private final StorageContainerLocationProtocol scmContainerClient;
   private ObjectName omInfoBeanName;
-  private final S3BucketManager s3BucketManager;
   private Timer metricsTimer;
   private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
   private static final ObjectWriter WRITER =
@@ -258,7 +265,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private final Runnable shutdownHook;
   private final File omMetaDir;
   private final boolean isAclEnabled;
-  private final IAccessAuthorizer accessAuthorizer;
+  private IAccessAuthorizer accessAuthorizer;
   private JvmPauseMonitor jvmPauseMonitor;
   private final SecurityConfig secConfig;
   private S3SecretManager s3SecretManager;
@@ -308,12 +315,37 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       throw new OMException("OM not initialized.",
           ResultCodes.OM_NOT_INITIALIZED);
     }
+
+    // Read configuration and set values.
+    ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
+    omMetaDir = OmUtils.getOmDbDir(configuration);
+    this.isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
+        OZONE_ACL_ENABLED_DEFAULT);
+    this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
+        OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
+    this.preallocateBlocksMax = conf.getInt(
+        OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
+        OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
+    this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
+        HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
+    this.useRatisForReplication = conf.getBoolean(
+        DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+    // TODO: This is a temporary check. Once fully implemented, all OM state
+    //  change should go through Ratis - be it standalone (for non-HA) or
+    //  replicated (for HA).
+    isRatisEnabled = configuration.getBoolean(
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+
     // Load HA related configurations
     loadOMHAConfigs(configuration);
+    InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
+    omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
 
     scmContainerClient = getScmContainerClient(configuration);
     // verifies that the SCM info in the OM Version file is correct.
     scmBlockClient = getScmBlockClient(configuration);
+    this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
 
     // For testing purpose only, not hit scm from om as Hadoop UGI can't login
     // two principals in the same JVM.
@@ -329,16 +361,32 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
         ProtobufRpcEngine.class);
 
-    metadataManager = new OmMetadataManagerImpl(configuration);
+    secConfig = new SecurityConfig(configuration);
+    // Create the KMS Key Provider
+    try {
+      kmsProvider = createKeyProviderExt(configuration);
+    } catch (IOException ioe) {
+      kmsProvider = null;
+      LOG.error("Fail to create Key Provider");
+    }
+    if (secConfig.isSecurityEnabled()) {
+      omComponent = OM_DAEMON + "-" + omId;
+      if(omStorage.getOmCertSerialId() == null) {
+        throw new RuntimeException("OzoneManager started in secure mode but " +
+            "doesn't have SCM signed certificate.");
+      }
+      certClient = new OMCertificateClient(new SecurityConfig(conf),
+          omStorage.getOmCertSerialId());
+    }
+    if (secConfig.isBlockTokenEnabled()) {
+      blockTokenMgr = createBlockTokenSecretManager(configuration);
+    }
+
+    instantiateServices();
+
+    initializeRatisServer();
+    initializeRatisClient();
 
-    // This is a temporary check. Once fully implemented, all OM state change
-    // should go through Ratis - be it standalone (for non-HA) or replicated
-    // (for HA).
-    isRatisEnabled = configuration.getBoolean(
-        OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
-        OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
-    startRatisServer();
-    startRatisClient();
     if (isRatisEnabled) {
       // Create Ratis storage dir
       String omRatisDirectory = OmUtils.getOMRatisDirectory(configuration);
@@ -361,59 +409,44 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         OM_RATIS_SNAPSHOT_INDEX);
     this.snapshotIndex = loadRatisSnapshotIndex();
 
-    InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
-    omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
-    secConfig = new SecurityConfig(configuration);
-    volumeManager = new VolumeManagerImpl(metadataManager, configuration);
+    metrics = OMMetrics.create();
 
-    // Create the KMS Key Provider
-    try {
-      kmsProvider = createKeyProviderExt(configuration);
-    } catch (IOException ioe) {
-      kmsProvider = null;
-      LOG.error("Fail to create Key Provider");
-    }
+    // Start Om Rpc Server.
+    omRpcServer = getRpcServer(conf);
+    omRpcAddress = updateRPCListenAddress(configuration,
+        OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+
+    shutdownHook = () -> {
+      saveOmMetrics();
+    };
+    ShutdownHookManager.get().addShutdownHook(shutdownHook,
+        SHUTDOWN_HOOK_PRIORITY);
+  }
+
+  /**
+   * Instantiate services which are dependent on the OM DB state.
+   * When OM state is reloaded, these services are re-initialized with the
+   * new OM state.
+   */
+  private void instantiateServices() throws IOException {
 
+    metadataManager = new OmMetadataManagerImpl(configuration);
+    volumeManager = new VolumeManagerImpl(metadataManager, configuration);
     bucketManager = new BucketManagerImpl(metadataManager, getKmsProvider(),
         isRatisEnabled);
-    metrics = OMMetrics.create();
-
     s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
         volumeManager, bucketManager);
     if (secConfig.isSecurityEnabled()) {
-      omComponent = OM_DAEMON + "-" + omId;
-      if(omStorage.getOmCertSerialId() == null) {
-        throw new RuntimeException("OzoneManager started in secure mode but " +
-            "doesn't have SCM signed certificate.");
-      }
-      certClient = new OMCertificateClient(new SecurityConfig(conf),
-          omStorage.getOmCertSerialId());
       s3SecretManager = new S3SecretManagerImpl(configuration, 
metadataManager);
       delegationTokenMgr = createDelegationTokenSecretManager(configuration);
     }
-    if (secConfig.isBlockTokenEnabled()) {
-      blockTokenMgr = createBlockTokenSecretManager(configuration);
-    }
-
-    omRpcServer = getRpcServer(conf);
-    omRpcAddress = updateRPCListenAddress(configuration,
-        OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
-
-    this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
 
     prefixManager = new PrefixManagerImpl(metadataManager);
     keyManager = new KeyManagerImpl(this, scmClient, configuration,
         omStorage.getOmId());
 
-    shutdownHook = () -> {
-      saveOmMetrics();
-    };
-    ShutdownHookManager.get().addShutdownHook(shutdownHook,
-        SHUTDOWN_HOOK_PRIORITY);
-    isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
-            OZONE_ACL_ENABLED_DEFAULT);
     if (isAclEnabled) {
-      accessAuthorizer = getACLAuthorizerInstance(conf);
+      accessAuthorizer = getACLAuthorizerInstance(configuration);
       if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
         OzoneNativeAuthorizer authorizer =
             (OzoneNativeAuthorizer) accessAuthorizer;
@@ -425,17 +458,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     } else {
       accessAuthorizer = null;
     }
-    ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
-    omMetaDir = OmUtils.getOmDbDir(configuration);
-    this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
-        OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
-    this.preallocateBlocksMax = conf.getInt(
-        OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
-        OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
-    this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
-        HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
-    this.useRatisForReplication = conf.getBoolean(
-        DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
   }
 
   /**
@@ -1235,6 +1257,14 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
     DefaultMetricsSystem.initialize("OzoneManager");
 
+    // Start Ratis services
+    if (omRatisServer != null) {
+      omRatisServer.start();
+    }
+    if (omRatisClient != null) {
+      omRatisClient.connect();
+    }
+
     metadataManager.start(configuration);
     startSecretManagerIfNecessary();
 
@@ -1305,8 +1335,14 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     omRpcServer.start();
     isOmRpcServerRunning = true;
 
-    startRatisServer();
-    startRatisClient();
+    initializeRatisServer();
+    if (omRatisServer != null) {
+      omRatisServer.start();
+    }
+    initializeRatisClient();
+    if (omRatisClient != null) {
+      omRatisClient.connect();
+    }
 
     try {
       httpServer = new OzoneManagerHttpServer(configuration, this);
@@ -1353,15 +1389,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   /**
    * Creates an instance of ratis server.
    */
-  private void startRatisServer() throws IOException {
+  private void initializeRatisServer() throws IOException {
     if (isRatisEnabled) {
       if (omRatisServer == null) {
         omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
             configuration, this, omNodeDetails, peerNodes);
       }
-      omRatisServer.start();
-
-      LOG.info("OzoneManager Ratis server started at port {}",
+      LOG.info("OzoneManager Ratis server initialized at port {}",
           omRatisServer.getServerPort());
     } else {
       omRatisServer = null;
@@ -1371,14 +1405,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   /**
    * Creates an instance of ratis client.
    */
-  private void startRatisClient() throws IOException {
+  private void initializeRatisClient() throws IOException {
     if (isRatisEnabled) {
       if (omRatisClient == null) {
         omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
             omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(),
             configuration);
       }
-      omRatisClient.connect();
     } else {
       omRatisClient = null;
     }
@@ -1398,11 +1431,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
-  public long saveRatisSnapshot() throws IOException {
+  public long saveRatisSnapshot(boolean flush) throws IOException {
     snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
 
-    // Flush the OM state to disk
-    getMetadataManager().getStore().flush();
+    if (flush) {
+      // Flush the OM state to disk
+      metadataManager.getStore().flush();
+    }
 
     PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
     LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
@@ -2697,7 +2732,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       }
     }
   }
-
   @Override
   public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
       IOException {
@@ -3069,6 +3103,179 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
+  /**
+   * Download and install latest checkpoint from leader OM.
+   * If the download checkpoints snapshot index is greater than this OM's
+   * last applied transaction index, then re-initialize the OM state via this
+   * checkpoint. Before re-initializing OM state, the OM Ratis server should
+   * be stopped so that no new transactions can be applied.
+   * @param leaderId peerNodeID of the leader OM
+   * @return If checkpoint is installed, return the corresponding termIndex.
+   * Otherwise, return null.
+   */
+  public TermIndex installSnapshot(String leaderId) {
+    if (omSnapshotProvider == null) {
+      LOG.error("OM Snapshot Provider is not configured as there are no peer " 
+
+          "nodes.");
+      return null;
+    }
+
+    DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
+    Path newDBlocation = omDBcheckpoint.getCheckpointLocation();
+
+    // Check if current ratis log index is smaller than the downloaded
+    // snapshot index. If yes, proceed by stopping the ratis server so that
+    // the OM state can be re-initialized. If no, then do not proceed with
+    // installSnapshot.
+    long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex();
+    long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
+    if (checkpointSnapshotIndex <= lastAppliedIndex) {
+      LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
+          "applied index: {} is greater than or equal to the checkpoint's " +
+          "snapshot index: {}. Deleting the downloaded checkpoint {}", 
leaderId,
+          lastAppliedIndex, checkpointSnapshotIndex,
+          newDBlocation);
+      try {
+        FileUtils.deleteFully(newDBlocation);
+      } catch (IOException e) {
+        LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
+            "from OM leader {}.", newDBlocation,
+            leaderId, e);
+      }
+      return null;
+    }
+
+    // Pause the State Machine so that no new transactions can be applied.
+    // This action also clears the OM Double Buffer so that if there are any
+    // pending transactions in the buffer, they are discarded.
+    // TODO: The Ratis server should also be paused here. This is required
+    //  because a leader election might happen while the snapshot
+    //  installation is in progress and the new leader might start sending
+    //  append log entries to the ratis server.
+    omRatisServer.getOmStateMachine().pause();
+
+    File dbBackup;
+    try {
+      dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, newDBlocation);
+    } catch (Exception e) {
+      LOG.error("OM DB checkpoint replacement with new downloaded checkpoint " 
+
+          "failed.", e);
+      return null;
+    }
+
+    // Reload the OM DB store with the new checkpoint.
+    // Restart (unpause) the state machine and update its last applied index
+    // to the installed checkpoint's snapshot index.
+    try {
+      reloadOMState(checkpointSnapshotIndex);
+      omRatisServer.getOmStateMachine().unpause(checkpointSnapshotIndex);
+    } catch (IOException e) {
+      LOG.error("Failed to reload OM state with new DB checkpoint.", e);
+      return null;
+    }
+
+    // Delete the backup DB
+    try {
+      FileUtils.deleteFully(dbBackup);
+    } catch (IOException e) {
+      LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
+    }
+
+    // TODO: We should only return the snpashotIndex to the leader.
+    //  Should be fixed after RATIS-586
+    TermIndex newTermIndex = TermIndex.newTermIndex(0,
+        checkpointSnapshotIndex);
+
+    return newTermIndex;
+  }
+
+  /**
+   * Download the latest OM DB checkpoint from the leader OM.
+   * @param leaderId OMNodeID of the leader OM node.
+   * @return latest DB checkpoint from leader OM.
+   */
+  private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
+    LOG.info("Downloading checkpoint from leader OM {} and reloading state " +
+        "from the checkpoint.", leaderId);
+
+    try {
+      return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
+    } catch (IOException e) {
+      LOG.error("Failed to download checkpoint from OM leader {}", leaderId, 
e);
+    }
+    return null;
+  }
+
+  /**
+   * Replace the current OM DB with the new DB checkpoint.
+   * @param lastAppliedIndex the last applied index in the current OM DB.
+   * @param checkpointPath path to the new DB checkpoint
+   * @return location of the backup of the original DB
+   * @throws Exception
+   */
+  File replaceOMDBWithCheckpoint(long lastAppliedIndex, Path checkpointPath)
+      throws Exception {
+    // Stop the DB first
+    DBStore store = metadataManager.getStore();
+    store.close();
+
+    // Take a backup of the current DB
+    File db = store.getDbLocation();
+    String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX +
+        lastAppliedIndex + "_" + System.currentTimeMillis();
+    File dbBackup = new File(db.getParentFile(), dbBackupName);
+
+    try {
+      Files.move(db.toPath(), dbBackup.toPath());
+    } catch (IOException e) {
+      LOG.error("Failed to create a backup of the current DB. Aborting " +
+          "snapshot installation.");
+      throw e;
+    }
+
+    // Move the new DB checkpoint into the om metadata dir
+    try {
+      Files.move(checkpointPath, db.toPath());
+    } catch (IOException e) {
+      LOG.error("Failed to move downloaded DB checkpoint {} to metadata " +
+          "directory {}. Resetting to original DB.", checkpointPath,
+          db.toPath());
+      Files.move(dbBackup.toPath(), db.toPath());
+      throw e;
+    }
+    return dbBackup;
+  }
+
+  /**
+   * Re-instantiate MetadataManager with new DB checkpoint.
+   * All the classes which use/ store MetadataManager should also be updated
+   * with the new MetadataManager instance.
+   */
+  void reloadOMState(long newSnapshotIndex) throws IOException {
+
+    instantiateServices();
+
+    // Restart required services
+    metadataManager.start(configuration);
+    keyManager.start(configuration);
+
+    // Set metrics and start metrics back ground thread
+    metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
+        .getVolumeTable()));
+    metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager
+        .getBucketTable()));
+
+    // Delete the omMetrics file if it exists and save the a new metrics file
+    // with new data
+    Files.deleteIfExists(getMetricsStorageFile().toPath());
+    saveOmMetrics();
+
+    // Update OM snapshot index with the new snapshot index (from the new OM
+    // DB state) and save the snapshot index to disk
+    this.snapshotIndex = newSnapshotIndex;
+    saveRatisSnapshot(false);
+  }
+
   public static  Logger getLogger() {
     return LOG;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 49a84da..1e51273 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -308,10 +308,15 @@ public final class OzoneManagerRatisServer {
   }
 
   /**
-   * Returns OzoneManager StateMachine.
+   * Initializes and returns OzoneManager StateMachine.
    */
   private OzoneManagerStateMachine getStateMachine() {
-    return  new OzoneManagerStateMachine(this);
+    return new OzoneManagerStateMachine(this);
+  }
+
+  @VisibleForTesting
+  public OzoneManagerStateMachine getOmStateMachine() {
+    return omStateMachine;
   }
 
   public OzoneManager getOzoneManager() {
@@ -387,6 +392,12 @@ public final class OzoneManagerRatisServer {
         SizeInBytes.valueOf(logAppenderQueueByteLimit));
     RaftServerConfigKeys.Log.setPreallocatedSize(properties,
         SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+        false);
+    final int logPurgeGap = conf.getInt(
+        OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP,
+        OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT);
+    RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
 
     // For grpc set the maximum message size
     // TODO: calculate the optimal max message size
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 31c467d..c51323e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .ContainerStateMachine;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
@@ -43,12 +44,15 @@ import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,8 +72,9 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   private OzoneManagerHARequestHandler handler;
   private RaftGroupId raftGroupId;
   private long lastAppliedIndex = 0;
-  private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
+  private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
   private final ExecutorService executorService;
+  private final ExecutorService installSnapshotExecutor;
 
   public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
     this.omRatisServer = ratisServer;
@@ -82,19 +87,20 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
         .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
     this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
+    this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
   }
 
   /**
    * Initializes the State Machine with the given server, group and storage.
-   * TODO: Load the latest snapshot from the file system.
    */
   @Override
-  public void initialize(
-      RaftServer server, RaftGroupId id, RaftStorage raftStorage)
-      throws IOException {
-    super.initialize(server, id, raftStorage);
-    this.raftGroupId = id;
-    storage.init(raftStorage);
+  public void initialize(RaftServer server, RaftGroupId id,
+      RaftStorage raftStorage) throws IOException {
+    lifeCycle.startAndTransition(() -> {
+      super.initialize(server, id, raftStorage);
+      this.raftGroupId = id;
+      storage.init(raftStorage);
+    });
   }
 
   /**
@@ -185,6 +191,27 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     }
   }
 
+  @Override
+  public void pause() {
+    lifeCycle.transition(LifeCycle.State.PAUSING);
+    lifeCycle.transition(LifeCycle.State.PAUSED);
+    ozoneManagerDoubleBuffer.stop();
+  }
+
+  /**
+   * Unpause the StateMachine, re-initialize the DoubleBuffer and update the
+   * lastAppliedIndex. This should be done after uploading new state to the
+   * StateMachine.
+   */
+  public void unpause(long newLastAppliedSnaphsotIndex) {
+    lifeCycle.startAndTransition(() -> {
+      this.ozoneManagerDoubleBuffer =
+          new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(),
+              this::updateLastAppliedIndex);
+      this.updateLastAppliedIndex(newLastAppliedSnaphsotIndex);
+    });
+  }
+
   /**
    * Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
    * is the log index corresponding to the last applied transaction on the OM
@@ -197,12 +224,45 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   public long takeSnapshot() throws IOException {
     LOG.info("Saving Ratis snapshot on the OM.");
     if (ozoneManager != null) {
-      return ozoneManager.saveRatisSnapshot();
+      return ozoneManager.saveRatisSnapshot(true);
     }
     return 0;
   }
 
   /**
+   * Leader OM has purged entries from its log. To catch up, OM must download
+   * the latest checkpoint from the leader OM and install it.
+   * @param roleInfoProto the leader node information
+   * @param firstTermIndexInLog TermIndex of the first append entry available
+   *                           in the Leader's log.
+   * @return the last term index included in the installed snapshot.
+   */
+  @Override
+  public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+      RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+
+    String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId())
+        .toString();
+
+    LOG.info("Received install snapshot notificaiton form OM leader: {} with " 
+
+            "term index: {}", leaderNodeId, firstTermIndexInLog);
+
+    if (!roleInfoProto.getRole().equals(RaftProtos.RaftPeerRole.LEADER)) {
+      // A non-leader Ratis server should not send this notification.
+      LOG.error("Received Install Snapshot notification from non-leader OM " +
+          "node: {}. Ignoring the notification.", leaderNodeId);
+      return completeExceptionally(new OMException("Received notification to " 
+
+          "install snaphost from non-leader OM node",
+          OMException.ResultCodes.RATIS_ERROR));
+    }
+
+    CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
+        () -> ozoneManager.installSnapshot(leaderNodeId),
+        installSnapshotExecutor);
+    return future;
+  }
+
+  /**
    * Notifies the state machine that the raft peer is no longer leader.
    */
   @Override
@@ -276,10 +336,9 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     this.raftGroupId = raftGroupId;
   }
 
-
   public void stop() {
     ozoneManagerDoubleBuffer.stop();
     HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+    HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, 
TimeUnit.SECONDS);
   }
-
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
index e1d4889..87446db 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
@@ -149,7 +149,7 @@ public class OzoneManagerSnapshotProvider {
    * @param leaderOMNodeID leader OM Node ID.
    * @return the DB checkpoint (including the ratis snapshot index)
    */
-  protected DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
+  public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
       throws IOException {
     String snapshotFileName = OM_SNAPSHOT_DB + "_" + 
System.currentTimeMillis();
     File targetFile = new File(omSnapshotDir, snapshotFileName + ".tar.gz");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to