Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2959

Change subject: [NO ISSUE][TX] Create New Log File Before Deleting Old Files
......................................................................

[NO ISSUE][TX] Create New Log File Before Deleting Old Files

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure next log file is created after a sharp check
  point before deleting old files. This is to prevent
  the case if a crash happens right after deleting the
  old files but before creating the new one, then the
  next time the system starts up, it will start with
  log file id 0 which is wrong.
- Log the details of latest index checkpoint when the
  low watermark of the new checkpoint is less than the
  low watermakr of the latest checkpoint.

Change-Id: I4817f697b43daff55726909ab074ec30a1c224ce
---
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
4 files changed, 70 insertions(+), 69 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/59/2959/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 22dec60..3b4c58a 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -65,7 +65,6 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.stubbing.Answer;
 
 public class CheckpointingTest {
 
@@ -134,7 +133,7 @@
                 ICheckpointManager checkpointManager = 
nc.getTransactionSubsystem().getCheckpointManager();
                 LogManager logManager = (LogManager) 
nc.getTransactionSubsystem().getLogManager();
                 // Number of log files after node startup should be one
-                int numberOfLogFiles = logManager.getLogFileIds().size();
+                int numberOfLogFiles = logManager.getSortedLogFileIds().size();
                 Assert.assertEquals(1, numberOfLogFiles);
 
                 // Low-water mark LSN
@@ -142,10 +141,10 @@
                 // Low-water mark log file id
                 long initialLowWaterMarkFileId = 
logManager.getLogFileId(lowWaterMarkLSN);
                 // Initial Low-water mark should be in the only available log 
file
-                Assert.assertEquals(initialLowWaterMarkFileId, 
logManager.getLogFileIds().get(0).longValue());
+                Assert.assertEquals(initialLowWaterMarkFileId, 
logManager.getSortedLogFileIds().get(0).longValue());
 
                 // Insert records until a new log file is created
-                while (logManager.getLogFileIds().size() == 1) {
+                while (logManager.getSortedLogFileIds().size() == 1) {
                     ITupleReference tuple = tupleGenerator.next();
                     DataflowUtils.addTupleToFrame(tupleAppender, tuple, 
insertOp);
                 }
@@ -160,9 +159,9 @@
                      * the low-water mark is still in it (i.e. it is still 
required for
                      * recovery)
                      */
-                    int numberOfLogFilesBeforeCheckpoint = 
logManager.getLogFileIds().size();
+                    int numberOfLogFilesBeforeCheckpoint = 
logManager.getSortedLogFileIds().size();
                     checkpointManager.tryCheckpoint(logManager.getAppendLSN());
-                    int numberOfLogFilesAfterCheckpoint = 
logManager.getLogFileIds().size();
+                    int numberOfLogFilesAfterCheckpoint = 
logManager.getSortedLogFileIds().size();
                     Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, 
numberOfLogFilesAfterCheckpoint);
 
                     /*
@@ -203,7 +202,7 @@
 
                     checkpointManager.tryCheckpoint(lowWaterMarkLSN);
                     // Validate initialLowWaterMarkFileId was deleted
-                    for (Long fileId : logManager.getLogFileIds()) {
+                    for (Long fileId : logManager.getSortedLogFileIds()) {
                         Assert.assertNotEquals(initialLowWaterMarkFileId, 
fileId.longValue());
                     }
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index 4453a1d..dad2676 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -147,7 +147,7 @@
     public void interruptedLogFileSwitch() throws Exception {
         final INcApplicationContext ncAppCtx = (INcApplicationContext) 
integrationUtil.ncs[0].getApplicationContext();
         final LogManager logManager = (LogManager) 
ncAppCtx.getTransactionSubsystem().getLogManager();
-        int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
+        int logFileCountBeforeInterrupt = 
logManager.getSortedLogFileIds().size();
 
         // ensure an interrupted transactor will create next log file but will 
fail to position the log channel
         final AtomicBoolean failed = new AtomicBoolean(false);
@@ -162,7 +162,7 @@
         interruptedTransactor.start();
         interruptedTransactor.join();
         // ensure a new log file was created and survived interrupt
-        int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+        int logFileCountAfterInterrupt = 
logManager.getSortedLogFileIds().size();
         Assert.assertEquals(logFileCountBeforeInterrupt + 1, 
logFileCountAfterInterrupt);
         Assert.assertFalse(failed.get());
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index f84167e..9654473 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -24,6 +24,8 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -31,6 +33,7 @@
 
 public class IndexCheckpoint {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final long INITIAL_CHECKPOINT_ID = 0;
     private long id;
@@ -52,6 +55,9 @@
     public static IndexCheckpoint next(IndexCheckpoint latest, long 
lowWatermark, long validComponentSequence,
             long lastComponentId) {
         if (lowWatermark < latest.getLowWatermark()) {
+            if (LOGGER.isErrorEnabled()) {
+                LOGGER.error("low watermark {} less than the latest checkpoint 
low watermark {}", lowWatermark, latest);
+            }
             throw new IllegalStateException("Low watermark should always be 
increasing");
         }
         IndexCheckpoint next = new IndexCheckpoint();
@@ -104,4 +110,13 @@
             throw HyracksDataException.create(e);
         }
     }
+
+    @Override
+    public String toString() {
+        try {
+            return asJson();
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index c0d18df..24333ed 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -29,7 +29,6 @@
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -369,52 +368,45 @@
         long fileId = 0;
         long offset = 0;
         File fileLogDir = new File(logDir);
-        try {
-            if (fileLogDir.exists()) {
-                List<Long> logFileIds = getLogFileIds();
-                if (logFileIds.isEmpty()) {
-                    fileId = nextLogFileId;
-                    createFileIfNotExists(getLogFilePath(fileId));
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("created a log file: " + 
getLogFilePath(fileId));
-                    }
-                } else {
-                    fileId = logFileIds.get(logFileIds.size() - 1);
-                    File logFile = new File(getLogFilePath(fileId));
-                    offset = logFile.length();
-                }
-            } else {
+        if (fileLogDir.exists()) {
+            List<Long> logFileIds = getSortedLogFileIds();
+            if (logFileIds.isEmpty()) {
                 fileId = nextLogFileId;
-                createNewDirectory(logDir);
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("created the log directory: " + 
logManagerProperties.getLogDir());
-                }
                 createFileIfNotExists(getLogFilePath(fileId));
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("created a log file: " + 
getLogFilePath(fileId));
-                }
+            } else {
+                fileId = logFileIds.get(logFileIds.size() - 1);
+                File logFile = new File(getLogFilePath(fileId));
+                offset = logFile.length();
             }
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Failed to initialize the log 
anchor", ioe);
+        } else {
+            fileId = nextLogFileId;
+            createNewDirectory(logDir);
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("created the log directory: " + 
logManagerProperties.getLogDir());
+            }
+            createFileIfNotExists(getLogFilePath(fileId));
         }
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("log file Id: " + fileId + ", offset: " + offset);
         }
-        return logFileSize * fileId + offset;
+        return getLogFileFirstLsn(fileId) + offset;
     }
 
     @Override
     public void renewLogFiles() {
         terminateLogFlusher();
         closeCurrentLogFile();
-        long lastMaxLogFileId = deleteAllLogFiles();
-        initializeLogManager(lastMaxLogFileId + 1);
+        long nextLogFileId = getNextLogFileId();
+        createFileIfNotExists(getLogFilePath(nextLogFileId));
+        final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
+        deleteOldLogFiles(logFileFirstLsn);
+        initializeLogManager(nextLogFileId);
     }
 
     @Override
     public void deleteOldLogFiles(long checkpointLSN) {
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
-        List<Long> logFileIds = getLogFileIds();
+        List<Long> logFileIds = getSortedLogFileIds();
         if (!logFileIds.isEmpty()) {
             //sort log files from oldest to newest
             Collections.sort(logFileIds);
@@ -461,24 +453,7 @@
         }
     }
 
-    private long deleteAllLogFiles() {
-        List<Long> logFileIds = getLogFileIds();
-        if (!logFileIds.isEmpty()) {
-            for (Long id : logFileIds) {
-                File file = new File(getLogFilePath(id));
-                LOGGER.info("Deleting log file: " + file.getAbsolutePath());
-                if (!file.delete()) {
-                    throw new IllegalStateException("Failed to delete a file: 
" + file.getAbsolutePath());
-                }
-                LOGGER.info("log file: " + file.getAbsolutePath() + " was 
deleted successfully");
-            }
-            return logFileIds.get(logFileIds.size() - 1);
-        } else {
-            throw new IllegalStateException("Couldn't find any log files.");
-        }
-    }
-
-    public List<Long> getLogFileIds() {
+    public List<Long> getSortedLogFileIds() {
         File fileLogDir = new File(logDir);
         String[] logFileNames = null;
         List<Long> logFileIds = null;
@@ -510,12 +485,7 @@
         for (String fileName : logFileNames) {
             
logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1)));
         }
-        Collections.sort(logFileIds, new Comparator<Long>() {
-            @Override
-            public int compare(Long arg0, Long arg1) {
-                return arg0.compareTo(arg1);
-            }
-        });
+        logFileIds.sort(Long::compareTo);
         return logFileIds;
     }
 
@@ -531,13 +501,18 @@
         return lsn / logFileSize;
     }
 
-    private static boolean createFileIfNotExists(String path) throws 
IOException {
-        File file = new File(path);
-        File parentFile = file.getParentFile();
-        if (parentFile != null) {
-            parentFile.mkdirs();
+    private static void createFileIfNotExists(String path) {
+        try {
+            File file = new File(path);
+            File parentFile = file.getParentFile();
+            if (parentFile != null) {
+                parentFile.mkdirs();
+            }
+            Files.createFile(file.toPath());
+            LOGGER.info("Created log file {}", path);
+        } catch (IOException e) {
+            throw new IllegalStateException("File to create file in " + path, 
e);
         }
-        return file.createNewFile();
     }
 
     private static boolean createNewDirectory(String path) {
@@ -579,7 +554,7 @@
 
     @Override
     public long getReadableSmallestLSN() {
-        List<Long> logFileIds = getLogFileIds();
+        List<Long> logFileIds = getSortedLogFileIds();
         if (!logFileIds.isEmpty()) {
             return logFileIds.get(0) * logFileSize;
         } else {
@@ -629,6 +604,18 @@
         fileChannel.close();
     }
 
+    private long getNextLogFileId() {
+        final List<Long> logFileIds = getSortedLogFileIds();
+        if (logFileIds.isEmpty()) {
+            return 0;
+        }
+        return logFileIds.get(logFileIds.size() - 1) + 1;
+    }
+
+    private long getLogFileFirstLsn(long logFileId) {
+        return logFileId * logFileSize;
+    }
+
     /**
      * This class is used to log FLUSH logs.
      * FLUSH logs are flushed on a different thread to avoid a possible 
deadlock in {@link LogBuffer} batchUnlock

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2959
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4817f697b43daff55726909ab074ec30a1c224ce
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to