Murtadha Hubail has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2959
Change subject: [NO ISSUE][TX] Create New Log File Before Deleting Old Files ...................................................................... [NO ISSUE][TX] Create New Log File Before Deleting Old Files - user model changes: no - storage format changes: no - interface changes: no Details: - Ensure next log file is created after a sharp check point before deleting old files. This is to prevent the case if a crash happens right after deleting the old files but before creating the new one, then the next time the system starts up, it will start with log file id 0 which is wrong. - Log the details of latest index checkpoint when the low watermark of the new checkpoint is less than the low watermakr of the latest checkpoint. Change-Id: I4817f697b43daff55726909ab074ec30a1c224ce --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java 4 files changed, 70 insertions(+), 69 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/59/2959/1 diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index 22dec60..3b4c58a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -65,7 +65,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.stubbing.Answer; public class CheckpointingTest { @@ -134,7 +133,7 @@ ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager(); LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager(); // Number of log files after node startup should be one - int numberOfLogFiles = logManager.getLogFileIds().size(); + int numberOfLogFiles = logManager.getSortedLogFileIds().size(); Assert.assertEquals(1, numberOfLogFiles); // Low-water mark LSN @@ -142,10 +141,10 @@ // Low-water mark log file id long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN); // Initial Low-water mark should be in the only available log file - Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue()); + Assert.assertEquals(initialLowWaterMarkFileId, logManager.getSortedLogFileIds().get(0).longValue()); // Insert records until a new log file is created - while (logManager.getLogFileIds().size() == 1) { + while (logManager.getSortedLogFileIds().size() == 1) { ITupleReference tuple = tupleGenerator.next(); DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); } @@ -160,9 +159,9 @@ * the low-water mark is still in it (i.e. it is still required for * recovery) */ - int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size(); + int numberOfLogFilesBeforeCheckpoint = logManager.getSortedLogFileIds().size(); checkpointManager.tryCheckpoint(logManager.getAppendLSN()); - int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size(); + int numberOfLogFilesAfterCheckpoint = logManager.getSortedLogFileIds().size(); Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint); /* @@ -203,7 +202,7 @@ checkpointManager.tryCheckpoint(lowWaterMarkLSN); // Validate initialLowWaterMarkFileId was deleted - for (Long fileId : logManager.getLogFileIds()) { + for (Long fileId : logManager.getSortedLogFileIds()) { Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue()); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java index 4453a1d..dad2676 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java @@ -147,7 +147,7 @@ public void interruptedLogFileSwitch() throws Exception { final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext(); final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager(); - int logFileCountBeforeInterrupt = logManager.getLogFileIds().size(); + int logFileCountBeforeInterrupt = logManager.getSortedLogFileIds().size(); // ensure an interrupted transactor will create next log file but will fail to position the log channel final AtomicBoolean failed = new AtomicBoolean(false); @@ -162,7 +162,7 @@ interruptedTransactor.start(); interruptedTransactor.join(); // ensure a new log file was created and survived interrupt - int logFileCountAfterInterrupt = logManager.getLogFileIds().size(); + int logFileCountAfterInterrupt = logManager.getSortedLogFileIds().size(); Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt); Assert.assertFalse(failed.get()); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java index f84167e..9654473 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java @@ -24,6 +24,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.core.JsonProcessingException; @@ -31,6 +33,7 @@ public class IndexCheckpoint { + private static final Logger LOGGER = LogManager.getLogger(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final long INITIAL_CHECKPOINT_ID = 0; private long id; @@ -52,6 +55,9 @@ public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence, long lastComponentId) { if (lowWatermark < latest.getLowWatermark()) { + if (LOGGER.isErrorEnabled()) { + LOGGER.error("low watermark {} less than the latest checkpoint low watermark {}", lowWatermark, latest); + } throw new IllegalStateException("Low watermark should always be increasing"); } IndexCheckpoint next = new IndexCheckpoint(); @@ -104,4 +110,13 @@ throw HyracksDataException.create(e); } } + + @Override + public String toString() { + try { + return asJson(); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index c0d18df..24333ed 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -29,7 +29,6 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -369,52 +368,45 @@ long fileId = 0; long offset = 0; File fileLogDir = new File(logDir); - try { - if (fileLogDir.exists()) { - List<Long> logFileIds = getLogFileIds(); - if (logFileIds.isEmpty()) { - fileId = nextLogFileId; - createFileIfNotExists(getLogFilePath(fileId)); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("created a log file: " + getLogFilePath(fileId)); - } - } else { - fileId = logFileIds.get(logFileIds.size() - 1); - File logFile = new File(getLogFilePath(fileId)); - offset = logFile.length(); - } - } else { + if (fileLogDir.exists()) { + List<Long> logFileIds = getSortedLogFileIds(); + if (logFileIds.isEmpty()) { fileId = nextLogFileId; - createNewDirectory(logDir); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("created the log directory: " + logManagerProperties.getLogDir()); - } createFileIfNotExists(getLogFilePath(fileId)); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("created a log file: " + getLogFilePath(fileId)); - } + } else { + fileId = logFileIds.get(logFileIds.size() - 1); + File logFile = new File(getLogFilePath(fileId)); + offset = logFile.length(); } - } catch (IOException ioe) { - throw new IllegalStateException("Failed to initialize the log anchor", ioe); + } else { + fileId = nextLogFileId; + createNewDirectory(logDir); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("created the log directory: " + logManagerProperties.getLogDir()); + } + createFileIfNotExists(getLogFilePath(fileId)); } if (LOGGER.isInfoEnabled()) { LOGGER.info("log file Id: " + fileId + ", offset: " + offset); } - return logFileSize * fileId + offset; + return getLogFileFirstLsn(fileId) + offset; } @Override public void renewLogFiles() { terminateLogFlusher(); closeCurrentLogFile(); - long lastMaxLogFileId = deleteAllLogFiles(); - initializeLogManager(lastMaxLogFileId + 1); + long nextLogFileId = getNextLogFileId(); + createFileIfNotExists(getLogFilePath(nextLogFileId)); + final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId); + deleteOldLogFiles(logFileFirstLsn); + initializeLogManager(nextLogFileId); } @Override public void deleteOldLogFiles(long checkpointLSN) { Long checkpointLSNLogFileID = getLogFileId(checkpointLSN); - List<Long> logFileIds = getLogFileIds(); + List<Long> logFileIds = getSortedLogFileIds(); if (!logFileIds.isEmpty()) { //sort log files from oldest to newest Collections.sort(logFileIds); @@ -461,24 +453,7 @@ } } - private long deleteAllLogFiles() { - List<Long> logFileIds = getLogFileIds(); - if (!logFileIds.isEmpty()) { - for (Long id : logFileIds) { - File file = new File(getLogFilePath(id)); - LOGGER.info("Deleting log file: " + file.getAbsolutePath()); - if (!file.delete()) { - throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath()); - } - LOGGER.info("log file: " + file.getAbsolutePath() + " was deleted successfully"); - } - return logFileIds.get(logFileIds.size() - 1); - } else { - throw new IllegalStateException("Couldn't find any log files."); - } - } - - public List<Long> getLogFileIds() { + public List<Long> getSortedLogFileIds() { File fileLogDir = new File(logDir); String[] logFileNames = null; List<Long> logFileIds = null; @@ -510,12 +485,7 @@ for (String fileName : logFileNames) { logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1))); } - Collections.sort(logFileIds, new Comparator<Long>() { - @Override - public int compare(Long arg0, Long arg1) { - return arg0.compareTo(arg1); - } - }); + logFileIds.sort(Long::compareTo); return logFileIds; } @@ -531,13 +501,18 @@ return lsn / logFileSize; } - private static boolean createFileIfNotExists(String path) throws IOException { - File file = new File(path); - File parentFile = file.getParentFile(); - if (parentFile != null) { - parentFile.mkdirs(); + private static void createFileIfNotExists(String path) { + try { + File file = new File(path); + File parentFile = file.getParentFile(); + if (parentFile != null) { + parentFile.mkdirs(); + } + Files.createFile(file.toPath()); + LOGGER.info("Created log file {}", path); + } catch (IOException e) { + throw new IllegalStateException("File to create file in " + path, e); } - return file.createNewFile(); } private static boolean createNewDirectory(String path) { @@ -579,7 +554,7 @@ @Override public long getReadableSmallestLSN() { - List<Long> logFileIds = getLogFileIds(); + List<Long> logFileIds = getSortedLogFileIds(); if (!logFileIds.isEmpty()) { return logFileIds.get(0) * logFileSize; } else { @@ -629,6 +604,18 @@ fileChannel.close(); } + private long getNextLogFileId() { + final List<Long> logFileIds = getSortedLogFileIds(); + if (logFileIds.isEmpty()) { + return 0; + } + return logFileIds.get(logFileIds.size() - 1) + 1; + } + + private long getLogFileFirstLsn(long logFileId) { + return logFileId * logFileSize; + } + /** * This class is used to log FLUSH logs. * FLUSH logs are flushed on a different thread to avoid a possible deadlock in {@link LogBuffer} batchUnlock -- To view, visit https://asterix-gerrit.ics.uci.edu/2959 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I4817f697b43daff55726909ab074ec30a1c224ce Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: stabilization-f69489 Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>