Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1871

Change subject: [ASTERIXDB-1969][STO] Ignore corrupted checkpoints
......................................................................

[ASTERIXDB-1969][STO] Ignore corrupted checkpoints

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ignore and delete corrupted checkpoint files.
- In case all checkpoint files are corrupted, force full recovery.
- Add test to check the new behavior of CheckpointManager.
- Remove unused recovery manager method.

Change-Id: Ied8a188501b63a0d339e6391cac684e3378f4c37
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
4 files changed, 92 insertions(+), 48 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/71/1871/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 275b055..10af9ff 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -158,27 +158,6 @@
         return state;
     }
 
-    //This method is used only when replication is disabled.
-    @Override
-    public void startRecovery(boolean synchronous) throws IOException, 
ACIDException {
-        state = SystemState.RECOVERING;
-        LOGGER.log(Level.INFO, "starting recovery ...");
-
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        Checkpoint checkpointObject = checkpointManager.getLatest();
-        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
-        if (lowWaterMarkLSN < readableSmallestLSN) {
-            lowWaterMarkLSN = readableSmallestLSN;
-        }
-
-        //delete any recovery files from previous failed recovery attempts
-        deleteRecoveryTemporaryFiles();
-
-        //get active partitions on this node
-        Set<Integer> activePartitions = 
localResourceRepository.getNodeOrignalPartitions();
-        replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), 
lowWaterMarkLSN);
-    }
-
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws 
IOException, ACIDException {
         state = SystemState.RECOVERING;
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index d2bf3d3..370205b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.test.logging;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,10 +33,12 @@
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Property;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -47,6 +50,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
+import 
org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -208,4 +212,50 @@
             Assert.fail(e.getMessage());
         }
     }
+
+    @Test
+    public void testCorruptedCheckpointFiles() {
+        try {
+            TestNodeController nc = new TestNodeController(new 
File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false);
+            nc.init();
+            final ITransactionSubsystem txnSubsystem = 
nc.getTransactionSubsystem();
+            final AbstractCheckpointManager checkpointManager =
+                    (AbstractCheckpointManager) 
txnSubsystem.getCheckpointManager();
+            // Make a checkpoint with the current minFirstLSN
+            final long minFirstLSN = 
txnSubsystem.getRecoveryManager().getMinFirstLSN();
+            checkpointManager.tryCheckpoint(minFirstLSN);
+            // Get the just created checkpoint
+            final Checkpoint validCheckpoint = checkpointManager.getLatest();
+            // Make sure the valid checkout wouldn't force full recovery
+            Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= 
minFirstLSN);
+            // Add a corrupted (empty) checkpoint file with a timestamp > than 
current checkpoint
+            Path corruptedCheckpointPath = 
checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1);
+            File corruptedCheckpoint = corruptedCheckpointPath.toFile();
+            corruptedCheckpoint.createNewFile();
+            // Make sure the corrupted checkpoint file was created
+            Assert.assertTrue(corruptedCheckpoint.exists());
+            // Try to get the latest checkpoint again
+            Checkpoint cpAfterCorruption = checkpointManager.getLatest();
+            // Make sure the valid checkpoint was returned
+            Assert.assertEquals(validCheckpoint.getTimeStamp(), 
cpAfterCorruption.getTimeStamp());
+            // Make sure the corrupted checkpoint file was deleted
+            Assert.assertFalse(corruptedCheckpoint.exists());
+            // Corrupt the valid checkpoint by replacing its content
+            final Path validCheckpointPath = 
checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp());
+            File validCheckpointFile = validCheckpointPath.toFile();
+            Assert.assertTrue(validCheckpointFile.exists());
+            // Delete the valid checkpoint file and create it as an empty file
+            validCheckpointFile.delete();
+            validCheckpointFile.createNewFile();
+            // Make sure the returned checkpoint (the forged checkpoint) will 
enforce full recovery
+            Checkpoint forgedCheckpoint = checkpointManager.getLatest();
+            Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < 
minFirstLSN);
+            // Make sure the forged checkpoint recovery will start from the 
first available log
+            final long readableSmallestLSN = 
txnSubsystem.getLogManager().getReadableSmallestLSN();
+            Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= 
readableSmallestLSN);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 84e1019..13970d1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -33,7 +33,7 @@
  */
 public interface IRecoveryManager {
 
-    public enum SystemState {
+    enum SystemState {
         BOOTSTRAPPING, // The first time the NC is bootstrapped.
         PERMANENT_DATA_LOSS, // No checkpoint files found on NC and it is not 
BOOTSTRAPPING (data loss).
         RECOVERING, // Recovery process is on-going.
@@ -41,7 +41,7 @@
         CORRUPTED // Some txn logs need to be replayed (need to perform 
recover).
     }
 
-    public class ResourceType {
+    class ResourceType {
         public static final byte LSM_BTREE = 0;
         public static final byte LSM_RTREE = 1;
         public static final byte LSM_INVERTED_INDEX = 2;
@@ -61,38 +61,25 @@
     SystemState getSystemState() throws ACIDException;
 
     /**
-     * Initiates a crash recovery.
-     *
-     * @param synchronous
-     *            indicates if the recovery is to be done in a synchronous
-     *            manner. In asynchronous mode, the recovery will happen as 
part
-     *            of a separate thread.
-     * @return SystemState the state of the system (@see SystemState) post
-     *         recovery.
-     * @throws ACIDException
-     */
-    public void startRecovery(boolean synchronous) throws IOException, 
ACIDException;
-
-    /**
      * Rolls back a transaction.
      *
      * @param txnContext
      *            the transaction context associated with the transaction
      * @throws ACIDException
      */
-    public void rollbackTransaction(ITransactionContext txnContext) throws 
ACIDException;
+    void rollbackTransaction(ITransactionContext txnContext) throws 
ACIDException;
 
     /**
      * @return min first LSN of the open indexes (including remote indexes if 
replication is enabled)
      * @throws HyracksDataException
      */
-    public long getMinFirstLSN() throws HyracksDataException;
+    long getMinFirstLSN() throws HyracksDataException;
 
     /**
      * @return min first LSN of the open indexes
      * @throws HyracksDataException
      */
-    public long getLocalMinFirstLSN() throws HyracksDataException;
+    long getLocalMinFirstLSN() throws HyracksDataException;
 
     /**
      * Replay the logs that belong to the passed {@code partitions} starting 
from the {@code lowWaterMarkLSN}
@@ -102,7 +89,7 @@
      * @throws IOException
      * @throws ACIDException
      */
-    public void replayPartitionsLogs(Set<Integer> partitions, ILogReader 
logReader, long lowWaterMarkLSN)
+    void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, 
long lowWaterMarkLSN)
             throws IOException, ACIDException;
 
     /**
@@ -114,12 +101,12 @@
      * @throws IOException
      *             if the file for the specified {@code jobId} with the {@code 
fileName} already exists
      */
-    public File createJobRecoveryFile(int jobId, String fileName) throws 
IOException;
+    File createJobRecoveryFile(int jobId, String fileName) throws IOException;
 
     /**
      * Deletes all temporary recovery files
      */
-    public void deleteRecoveryTemporaryFiles();
+    void deleteRecoveryTemporaryFiles();
 
     /**
      * Performs the local recovery process on {@code partitions}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 8d3e0a7..24d316b 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -88,13 +88,26 @@
         List<Checkpoint> checkpointObjectList = new ArrayList<>();
         for (File file : checkpoints) {
             try {
-                LOGGER.log(Level.WARNING, "Reading snapshot file: " + 
file.getAbsolutePath());
+                LOGGER.log(Level.WARNING, "Reading checkpoint file: " + 
file.getAbsolutePath());
                 String jsonString = new 
String(Files.readAllBytes(Paths.get(file.getAbsolutePath())));
                 checkpointObjectList.add(Checkpoint.fromJson(jsonString));
             } catch (IOException e) {
-                throw new ACIDException("Failed to read a checkpoint file", e);
+                // ignore corrupted checkpoint file
+                LOGGER.log(Level.WARNING, "Failed to read checkpoint file: " + 
file.getAbsolutePath(), e);
+                file.delete();
+                LOGGER.log(Level.INFO, "Deleted corrupted checkpoint file: " + 
file.getAbsolutePath());
             }
         }
+        /**
+         * If all checkpoint files are corrupted, we have no option but to try 
to perform recovery.
+         * We will forge a checkpoint that forces recovery to start from the 
beginning of the log.
+         * This shouldn't happen unless a hardware corruption happens.
+         */
+        if (checkpointObjectList.isEmpty()) {
+            LOGGER.severe("All checkpoint files are corrupted. Forcing 
recovery from the beginning of the log");
+            checkpointObjectList.add(forgeForceRecoveryCheckpoint());
+        }
+
         // Sort checkpointObjects in descending order by timeStamp to find out 
the most recent one.
         Collections.sort(checkpointObjectList);
 
@@ -125,6 +138,11 @@
         // Nothing to dump
     }
 
+    public Path getCheckpointPath(long checkpointTimestamp) {
+        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + 
CHECKPOINT_FILENAME_PREFIX + Long
+                .toString(checkpointTimestamp));
+    }
+
     protected void capture(long minMCTFirstLSN, boolean sharp) throws 
HyracksDataException {
         ILogManager logMgr = txnSubsystem.getLogManager();
         ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
@@ -134,14 +152,24 @@
         cleanup();
     }
 
+    protected Checkpoint forgeForceRecoveryCheckpoint() {
+        /**
+         * By setting the checkpoint first LSN (low watermark) to 
Long.MIN_VALUE, the recovery manager will start from
+         * the first available log.
+         * We set the storage version to the current version. If there is a 
version mismatch, it will be detected
+         * during recovery.
+         */
+        return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, 
Integer.MIN_VALUE, System.currentTimeMillis(), false,
+                StorageConstants.VERSION);
+    }
+
     private void persist(Checkpoint checkpoint) throws HyracksDataException {
-        // Construct checkpoint file name
-        String fileName = checkpointDir.getAbsolutePath() + File.separator + 
CHECKPOINT_FILENAME_PREFIX
-                + Long.toString(checkpoint.getTimeStamp());
+        // Get checkpoint file path
+        Path path = getCheckpointPath(checkpoint.getTimeStamp());
         // Write checkpoint file to disk
-        Path path = Paths.get(fileName);
         try (BufferedWriter writer = Files.newBufferedWriter(path)) {
             writer.write(checkpoint.asJson());
+            writer.flush();
         } catch (IOException e) {
             throw new HyracksDataException("Failed to write checkpoint to 
disk", e);
         }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1871
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ied8a188501b63a0d339e6391cac684e3378f4c37
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to