hanishakoneru commented on a change in pull request #986:
URL: https://github.com/apache/hadoop-ozone/pull/986#discussion_r433429260



##########
File path: 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
##########
@@ -133,13 +133,6 @@ private OMConfigKeys() {
       "ozone.om.ratis.log.purge.gap";
   public static final int OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000;
 
-  // OM Snapshot configurations
-  public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
-      = "ozone.om.ratis.snapshot.auto.trigger.threshold";

Review comment:
       We need this config to be able to control the snapshot trigger from 
Ratis side for testing purposes. For some tests (TestOzoneManagerHA, OM HA 
robot tests) we need to set a lower value for auto trigger threshold so that 
logs can be purged which in turn can instantiate install snapshots. 
   We can keep it as an internal config and not expose it to users.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
##########
@@ -3027,30 +3017,59 @@ public TermIndex installSnapshot(String leaderId) {
     DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
     Path newDBlocation = omDBcheckpoint.getCheckpointLocation();
 
-    // Check if current ratis log index is smaller than the downloaded
-    // snapshot index. If yes, proceed by stopping the ratis server so that
-    // the OM state can be re-initialized. If no, then do not proceed with
-    // installSnapshot.
     long lastAppliedIndex = omRatisServer.getLastAppliedTermIndex().getIndex();
-    long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
-    long checkpointSnapshotTermIndex =
-        omDBcheckpoint.getRatisSnapshotTerm();
-    if (checkpointSnapshotIndex <= lastAppliedIndex) {
-      LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
-          "applied index: {} is greater than or equal to the checkpoint's " +
-          "snapshot index: {}. Deleting the downloaded checkpoint {}", 
leaderId,
-          lastAppliedIndex, checkpointSnapshotIndex,
-          newDBlocation);
-      try {
-        FileUtils.deleteFully(newDBlocation);
-      } catch (IOException e) {
-        LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
-            "from OM leader {}.", newDBlocation,
-            leaderId, e);
+
+    // Check if current ratis log index is smaller than the downloaded
+    // checkpoint transaction index. If yes, proceed by stopping the ratis
+    // server so that the OM state can be re-initialized. If no, then do not
+    // proceed with installSnapshot.
+
+    OMTransactionInfo omTransactionInfo = null;
+    try {
+      // Set new DB location as DB path
+      OzoneConfiguration tempConfig = getConfiguration();
+
+      Path dbDir = newDBlocation.getParent();
+      if (dbDir != null) {
+        tempConfig.set(OZONE_OM_DB_DIRS, dbDir.toString());
+      } else {
+        LOG.error("Incorrect DB location path {} received from checkpoint.",
+            newDBlocation);
+        return null;
+      }
+
+      OMMetadataManager tempMetadataMgr =
+          new OmMetadataManagerImpl(configuration);

Review comment:
       Instead of instantiating a new MetadataManagerImpl, can we not directly 
load the RocksDB and read the transactionInfo?

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
##########
@@ -3027,30 +3017,59 @@ public TermIndex installSnapshot(String leaderId) {
     DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
     Path newDBlocation = omDBcheckpoint.getCheckpointLocation();
 
-    // Check if current ratis log index is smaller than the downloaded
-    // snapshot index. If yes, proceed by stopping the ratis server so that
-    // the OM state can be re-initialized. If no, then do not proceed with
-    // installSnapshot.
     long lastAppliedIndex = omRatisServer.getLastAppliedTermIndex().getIndex();
-    long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
-    long checkpointSnapshotTermIndex =
-        omDBcheckpoint.getRatisSnapshotTerm();
-    if (checkpointSnapshotIndex <= lastAppliedIndex) {
-      LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
-          "applied index: {} is greater than or equal to the checkpoint's " +
-          "snapshot index: {}. Deleting the downloaded checkpoint {}", 
leaderId,
-          lastAppliedIndex, checkpointSnapshotIndex,
-          newDBlocation);
-      try {
-        FileUtils.deleteFully(newDBlocation);
-      } catch (IOException e) {
-        LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
-            "from OM leader {}.", newDBlocation,
-            leaderId, e);
+
+    // Check if current ratis log index is smaller than the downloaded
+    // checkpoint transaction index. If yes, proceed by stopping the ratis
+    // server so that the OM state can be re-initialized. If no, then do not
+    // proceed with installSnapshot.
+
+    OMTransactionInfo omTransactionInfo = null;
+    try {
+      // Set new DB location as DB path
+      OzoneConfiguration tempConfig = getConfiguration();
+
+      Path dbDir = newDBlocation.getParent();
+      if (dbDir != null) {
+        tempConfig.set(OZONE_OM_DB_DIRS, dbDir.toString());
+      } else {
+        LOG.error("Incorrect DB location path {} received from checkpoint.",
+            newDBlocation);
+        return null;
+      }
+
+      OMMetadataManager tempMetadataMgr =
+          new OmMetadataManagerImpl(configuration);
+
+      omTransactionInfo =
+          OMTransactionInfo.readTransactionInfo(tempMetadataMgr);
+      tempMetadataMgr.stop();
+
+      if (omTransactionInfo.getTransactionIndex() <= lastAppliedIndex) {
+        LOG.error("Failed to install checkpoint from OM leader: {}. The last " 
+
+                "applied index: {} is greater than or equal to the " +
+                "checkpoint's applied index: {}. Deleting the downloaded " +
+                "checkpoint {}", leaderId, lastAppliedIndex,
+            omTransactionInfo.getTransactionIndex(), newDBlocation);
+        try {
+          FileUtils.deleteFully(newDBlocation);
+        } catch (IOException e) {
+          LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
+                  "from OM leader {}.", newDBlocation,
+              leaderId, e);
+          return null;
+        }
       }
+    } catch (Exception ex) {
+      LOG.error("Failed during checking downloaded leader transaction index " +

Review comment:
       The exception caught here could be due to varied reasons. Can we have a 
more generic error message or have different try blocks. Also, if there is an 
exception, we should delete the downloaded checkpoint.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -515,13 +515,29 @@ private synchronized void 
computeAndUpdateLastAppliedIndex(
     }
   }
 
-  public void updateLastAppliedIndexWithSnaphsotIndex() {
+  public void updateLastAppliedIndexWithSnaphsotIndex() throws IOException {
     // This is done, as we have a check in Ratis for not throwing
     // LeaderNotReadyException, it checks stateMachineIndex >= raftLog
     // nextIndex (placeHolderIndex).
-    setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),
-        snapshotInfo.getIndex()));
-    LOG.info("LastAppliedIndex set from SnapShotInfo {}",
+
+    OMTransactionInfo omTransactionInfo =
+        OMTransactionInfo.readTransactionInfo(
+            ozoneManager.getMetadataManager());
+
+    if (omTransactionInfo != null) {
+      setLastAppliedTermIndex(TermIndex.newTermIndex(
+          omTransactionInfo.getCurrentTerm(),
+          omTransactionInfo.getTransactionIndex()));
+      snapshotInfo.updateTermIndex(omTransactionInfo.getCurrentTerm(),
+          omTransactionInfo.getTransactionIndex());
+    } else {
+      // On a newly setup OM,it will not have any transaction info in DB,
+      // use default values.
+      setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),

Review comment:
       On a newly setup OM, snapshotInfo is initialized to 0,-1 and so is 
LastAppliedTermIndex. This step is not required as it is setting the same 
values again.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -515,13 +515,29 @@ private synchronized void 
computeAndUpdateLastAppliedIndex(
     }
   }
 
-  public void updateLastAppliedIndexWithSnaphsotIndex() {
+  public void updateLastAppliedIndexWithSnaphsotIndex() throws IOException {
     // This is done, as we have a check in Ratis for not throwing
     // LeaderNotReadyException, it checks stateMachineIndex >= raftLog
     // nextIndex (placeHolderIndex).
-    setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),
-        snapshotInfo.getIndex()));
-    LOG.info("LastAppliedIndex set from SnapShotInfo {}",
+
+    OMTransactionInfo omTransactionInfo =
+        OMTransactionInfo.readTransactionInfo(
+            ozoneManager.getMetadataManager());
+
+    if (omTransactionInfo != null) {
+      setLastAppliedTermIndex(TermIndex.newTermIndex(
+          omTransactionInfo.getCurrentTerm(),
+          omTransactionInfo.getTransactionIndex()));
+      snapshotInfo.updateTermIndex(omTransactionInfo.getCurrentTerm(),
+          omTransactionInfo.getTransactionIndex());
+    } else {
+      // On a newly setup OM,it will not have any transaction info in DB,
+      // use default values.
+      setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),
+          snapshotInfo.getIndex()));
+    }
+
+    LOG.info("LastAppliedIndex is set from TransactionInfo from OM DB is {}",

Review comment:
       Typo: as {}

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -338,20 +341,17 @@ public void unpause(long newLastAppliedSnaphsotIndex,
   }
 
   /**
-   * Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
-   * is the log index corresponding to the last applied transaction on the OM
-   * State Machine.
+   * Take OM Ratis snapshot is a dummy operation as when double buffer
+   * flushes the lastAppliedIndex is flushed to DB and that is used as
+   * snapshot index.
    *
    * @return the last applied index on the state machine which has been
    * stored in the snapshot file.
    */
   @Override
   public long takeSnapshot() throws IOException {
-    LOG.info("Saving Ratis snapshot on the OM.");
-    if (ozoneManager != null) {
-      return ozoneManager.saveRatisSnapshot().getIndex();
-    }
-    return 0;
+    LOG.info("Current Snapshot Index {}", getLastAppliedTermIndex());
+    return getLastAppliedTermIndex().getIndex();

Review comment:
       We should return the flushed TransactionInfo#logIndex here too. 
TransactionIndex returned via getLastAppliedIndex() might not have been flushed 
to DB yet. And logs could be purged upto this index. And if OM crashes before 
the transactions are flushed to disk by OMDoubleBuffer, there could be data 
loss.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisSnapshotInfo.java
##########
@@ -49,81 +39,15 @@
   private volatile long term = 0;
   private volatile long snapshotIndex = -1;
 
-  private final File ratisSnapshotFile;
-
-  public OMRatisSnapshotInfo(File ratisDir) throws IOException {

Review comment:
       This class seems redundant now. The getTermIndex() is only used in tests.
   There are two options: 
   1. We can either update OMRatisSnapshotInfo on every DB flush when 
transactionInfo is updated to keep snapshotInfo in memory. But this update and 
DB update operation would need to be atomic and there is not mych used for it 
too.
   2. The second option is to get rid of OMRatisSnapshotInfo and always read 
from DB for snapshotIndex.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
##########
@@ -3027,30 +3017,59 @@ public TermIndex installSnapshot(String leaderId) {
     DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId);
     Path newDBlocation = omDBcheckpoint.getCheckpointLocation();
 
-    // Check if current ratis log index is smaller than the downloaded
-    // snapshot index. If yes, proceed by stopping the ratis server so that
-    // the OM state can be re-initialized. If no, then do not proceed with
-    // installSnapshot.
     long lastAppliedIndex = omRatisServer.getLastAppliedTermIndex().getIndex();
-    long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex();
-    long checkpointSnapshotTermIndex =
-        omDBcheckpoint.getRatisSnapshotTerm();
-    if (checkpointSnapshotIndex <= lastAppliedIndex) {
-      LOG.error("Failed to install checkpoint from OM leader: {}. The last " +
-          "applied index: {} is greater than or equal to the checkpoint's " +
-          "snapshot index: {}. Deleting the downloaded checkpoint {}", 
leaderId,
-          lastAppliedIndex, checkpointSnapshotIndex,
-          newDBlocation);
-      try {
-        FileUtils.deleteFully(newDBlocation);
-      } catch (IOException e) {
-        LOG.error("Failed to fully delete the downloaded DB checkpoint {} " +
-            "from OM leader {}.", newDBlocation,
-            leaderId, e);
+
+    // Check if current ratis log index is smaller than the downloaded
+    // checkpoint transaction index. If yes, proceed by stopping the ratis
+    // server so that the OM state can be re-initialized. If no, then do not
+    // proceed with installSnapshot.
+
+    OMTransactionInfo omTransactionInfo = null;
+    try {
+      // Set new DB location as DB path
+      OzoneConfiguration tempConfig = getConfiguration();
+
+      Path dbDir = newDBlocation.getParent();
+      if (dbDir != null) {
+        tempConfig.set(OZONE_OM_DB_DIRS, dbDir.toString());
+      } else {
+        LOG.error("Incorrect DB location path {} received from checkpoint.",
+            newDBlocation);
+        return null;
+      }
+
+      OMMetadataManager tempMetadataMgr =
+          new OmMetadataManagerImpl(configuration);
+
+      omTransactionInfo =
+          OMTransactionInfo.readTransactionInfo(tempMetadataMgr);
+      tempMetadataMgr.stop();
+

Review comment:
       Can we add a Info Log here. Something to say "Downloaded checkpoint with 
OMTransactionInfo - {}". This would help with debugging as the downloaded 
checkpoint's snapshotIndex is not logged in OMSnapshotProvider anymore.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -139,6 +139,10 @@ public void initialize(RaftServer server, RaftGroupId id,
 
   @Override
   public SnapshotInfo getLatestSnapshot() {
+    TermIndex lastAppliedIndex = getLastAppliedTermIndex();

Review comment:
       getLatestSnapshot is used internally in Ratis during state reload. 
During reload or restart, getLastAppliedTermIndex would be 0,-1. The 
TransactionInfo stored in DB should be returned here instead.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
##########
@@ -112,16 +112,16 @@ public OzoneManagerSnapshotProvider(ConfigurationSource 
conf,
    */
   public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
       throws IOException {
-    String snapshotFileName = OM_SNAPSHOT_DB + "_" + 
System.currentTimeMillis();
-    File targetFile = new File(omSnapshotDir, snapshotFileName + ".tar.gz");
+    String snapshotTime = Long.toString(System.currentTimeMillis());
+    String snapshotFileName = Paths.get(omSnapshotDir.getAbsolutePath(),
+        snapshotTime, OM_DB_NAME).toFile().getAbsolutePath();
+    File targetFile = new File(snapshotFileName + ".tar.gz");

Review comment:
       Any reason for changing the naming convention here?
   The new naming scheme would be <OM_Ratis_Dir>/<Timestamp>/om.db. 
   I think it would be more readable to have the timestamp at the end of file 
name instead of as a parent directory. What do you think?
   

##########
File path: 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
##########
@@ -111,23 +113,43 @@ public void testDownloadCheckpoint() throws Exception {
         .getFailoverProxyProvider(objectStore.getClientProxy())
         .getCurrentProxyOMNodeId();
 
-    OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
 
     // Get a follower OM
-    String followerNodeId = ozoneManager.getPeerNodes().get(0).getOMNodeId();
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId();
     OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
 
     // Download latest checkpoint from leader OM to follower OM
     DBCheckpoint omSnapshot = followerOM.getOmSnapshotProvider()
         .getOzoneManagerDBSnapshot(leaderOMNodeId);
 
-    long leaderSnapshotIndex = ozoneManager.getRatisSnapshotIndex();
-    long downloadedSnapshotIndex = omSnapshot.getRatisSnapshotIndex();
+    long leaderSnapshotIndex =
+        OMTransactionInfo.readTransactionInfo(leaderOM.getMetadataManager())
+            .getTransactionIndex();

Review comment:
       Can we not call the OzoneManager#getRatisSnapshotIndex() method here?

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
##########
@@ -3073,9 +3092,8 @@ public TermIndex installSnapshot(String leaderId) {
     // Restart (unpause) the state machine and update its last applied index
     // to the installed checkpoint's snapshot index.
     try {
-      reloadOMState(checkpointSnapshotIndex, checkpointSnapshotTermIndex);
-      omRatisServer.getOmStateMachine().unpause(checkpointSnapshotIndex,
-          checkpointSnapshotTermIndex);
+      reloadOMState(leaderIndex, leaderTerm);
+      omRatisServer.getOmStateMachine().unpause(leaderIndex, leaderTerm);

Review comment:
       Question: In case of some failure after OM is paused, should we unpause 
with the old DB?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to