Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2444][STO] Avoid Using System Clock in Checkpoints ......................................................................
[ASTERIXDB-2444][STO] Avoid Using System Clock in Checkpoints - user model changes: no - storage format changes: yes - interface changes: yes Details: - Replace the usage of system clock timestamps in checkpoints by a sequencer. - Update Asterix/Hyracks storage version to reflect the recent changes in storage. - This change is expected to break storage backward compatibility. Change-Id: Idc061e6eaccfb308b29a5a263b77a0a849694d4f Reviewed-on: https://asterix-gerrit.ics.uci.edu/2961 Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java 7 files changed, 144 insertions(+), 149 deletions(-) Approvals: Anon. E. Moose #1000171: Michael Blow: Looks good to me, approved; Verified; Verified diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index 6009f51..e67246a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -297,7 +297,7 @@ // Make sure the valid checkout wouldn't force full recovery Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN); // Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint - Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1); + Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId() + 1); File corruptedCheckpoint = corruptedCheckpointPath.toFile(); corruptedCheckpoint.createNewFile(); // Make sure the corrupted checkpoint file was created @@ -305,11 +305,11 @@ // Try to get the latest checkpoint again Checkpoint cpAfterCorruption = checkpointManager.getLatest(); // Make sure the valid checkpoint was returned - Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp()); + Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId()); // Make sure the corrupted checkpoint file was deleted Assert.assertFalse(corruptedCheckpoint.exists()); // Corrupt the valid checkpoint by replacing its content - final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp()); + final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId()); File validCheckpointFile = validCheckpointPath.toFile(); Assert.assertTrue(validCheckpointFile.exists()); // Delete the valid checkpoint file and create it as an empty file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java index 7f1a1c7..8fe0353 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java @@ -18,11 +18,12 @@ */ package org.apache.asterix.common.transactions; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IJsonSerializable; import org.apache.hyracks.api.io.IPersistedResourceRegistry; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; public class Checkpoint implements Comparable<Checkpoint>, IJsonSerializable { @@ -30,16 +31,16 @@ private final long checkpointLsn; private final long minMCTFirstLsn; private final long maxTxnId; - private final long timeStamp; private final boolean sharp; private final int storageVersion; + private long id; - public Checkpoint(long checkpointLsn, long minMCTFirstLsn, long maxTxnId, long timeStamp, boolean sharp, + public Checkpoint(long id, long checkpointLsn, long minMCTFirstLsn, long maxTxnId, boolean sharp, int storageVersion) { + this.id = id; this.checkpointLsn = checkpointLsn; this.minMCTFirstLsn = minMCTFirstLsn; this.maxTxnId = maxTxnId; - this.timeStamp = timeStamp; this.sharp = sharp; this.storageVersion = storageVersion; } @@ -56,8 +57,8 @@ return maxTxnId; } - public long getTimeStamp() { - return timeStamp; + public long getId() { + return id; } public boolean isSharp() { @@ -69,68 +70,47 @@ } @Override - public int compareTo(Checkpoint checkpoint) { - long compareTimeStamp = checkpoint.getTimeStamp(); - - // Descending order - long diff = compareTimeStamp - this.timeStamp; - if (diff > 0) { - return 1; - } else if (diff == 0) { - return 0; - } else { - return -1; - } + public int compareTo(Checkpoint other) { + return Long.compare(this.id, other.id); } @Override - public boolean equals(Object obj) { - if (this == obj) { + public boolean equals(Object o) { + if (this == o) { return true; } - if (obj == null) { + if (o == null || getClass() != o.getClass()) { return false; } - if (!(obj instanceof Checkpoint)) { - return false; - } - Checkpoint other = (Checkpoint) obj; - return compareTo(other) == 0; + Checkpoint that = (Checkpoint) o; + return id == that.id; } @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32)); - result = prime * result + Long.hashCode(maxTxnId); - result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32)); - result = prime * result + (sharp ? 1231 : 1237); - result = prime * result + storageVersion; - result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32)); - return result; + return Long.hashCode(id); } @Override public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException { final ObjectNode checkpointJson = registry.getClassIdentifier(getClass(), serialVersionUID); + checkpointJson.put("id", id); checkpointJson.put("checkpointLsn", checkpointLsn); checkpointJson.put("minMCTFirstLsn", minMCTFirstLsn); checkpointJson.put("maxTxnId", maxTxnId); - checkpointJson.put("timeStamp", timeStamp); - checkpointJson.put("sharp", timeStamp); + checkpointJson.put("sharp", sharp); checkpointJson.put("storageVersion", storageVersion); return checkpointJson; } @SuppressWarnings("squid:S1172") // unused parameter public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) { + long id = json.get("id").asLong(); long checkpointLsn = json.get("checkpointLsn").asLong(); long minMCTFirstLsn = json.get("minMCTFirstLsn").asLong(); long maxTxnId = json.get("maxTxnId").asLong(); - long timeStamp = json.get("timeStamp").asLong(); boolean sharp = json.get("sharp").asBoolean(); int storageVersion = json.get("storageVersion").asInt(); - return new Checkpoint(checkpointLsn, minMCTFirstLsn, maxTxnId, timeStamp, sharp, storageVersion); + return new Checkpoint(id, checkpointLsn, minMCTFirstLsn, maxTxnId, sharp, storageVersion); } } \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java index e3cf8b8..36cea55 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.common.transactions; -import org.apache.asterix.common.exceptions.ACIDException; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; @@ -26,10 +25,8 @@ /** * @return The latest checkpoint on disk if any exists. Otherwise null. - * @throws ACIDException - * when a checkpoint file cannot be read. */ - Checkpoint getLatest() throws ACIDException; + Checkpoint getLatest(); /** * Performs a sharp checkpoint. diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index 19b006f..644f3c0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -41,7 +41,7 @@ /** * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). */ - private static final int LOCAL_STORAGE_VERSION = 4; + private static final int LOCAL_STORAGE_VERSION = 5; /** * The storage version of AsterixDB stack. diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java index 0cbd6c6..e221da8 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java @@ -60,6 +60,7 @@ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final long SHARP_CHECKPOINT_LSN = -1; private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX); + private static final long FIRST_CHECKPOINT_ID = 0; private final File checkpointDir; private final int historyToKeep; private final int lsnThreshold; @@ -88,25 +89,118 @@ lsnThreshold = checkpointProperties.getLsnThreshold(); pollFrequency = checkpointProperties.getPollFrequency(); // We must keep at least the latest checkpoint - historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep(); + historyToKeep = checkpointProperties.getHistoryToKeep() + 1; persistedResourceRegistry = txnSubsystem.getApplicationContext().getPersistedResourceRegistry(); } @Override - public Checkpoint getLatest() throws ACIDException { - // Read all checkpointObjects from the existing checkpoint files + public Checkpoint getLatest() { LOGGER.log(Level.INFO, "Getting latest checkpoint"); + final List<File> checkpointFiles = getCheckpointFiles(); + if (checkpointFiles.isEmpty()) { + return null; + } + final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles); + if (orderedCheckpoints.isEmpty()) { + /* + * If all checkpoint files are corrupted, we have no option but to try to perform recovery. + * We will forge a checkpoint that forces recovery to start from the beginning of the log. + * This shouldn't happen unless a hardware corruption happens. + */ + return forgeForceRecoveryCheckpoint(); + } + return orderedCheckpoints.get(orderedCheckpoints.size() - 1); + } + + @Override + public void start() { + checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency); + checkpointer.start(); + } + + @Override + public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { + checkpointer.shutdown(); + checkpointer.interrupt(); + try { + // Wait until checkpoint thread stops + checkpointer.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void dumpState(OutputStream os) throws IOException { + // Nothing to dump + } + + public Path getCheckpointPath(long checkpointId) { + return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + + Long.toString(checkpointId)); + } + + protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException { + ILogManager logMgr = txnSubsystem.getLogManager(); + ITransactionManager txnMgr = txnSubsystem.getTransactionManager(); + final long nextCheckpointId = getNextCheckpointId(); + final Checkpoint checkpointObject = new Checkpoint(nextCheckpointId, logMgr.getAppendLSN(), minMCTFirstLSN, + txnMgr.getMaxTxnId(), sharp, StorageConstants.VERSION); + persist(checkpointObject); + cleanup(); + } + + private Checkpoint forgeForceRecoveryCheckpoint() { + /* + * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from + * the first available log. + * We set the storage version to the current version. If there is a version mismatch, it will be detected + * during recovery. + */ + return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, FIRST_CHECKPOINT_ID, false, + StorageConstants.VERSION); + } + + private void persist(Checkpoint checkpoint) throws HyracksDataException { + // Get checkpoint file path + Path path = getCheckpointPath(checkpoint.getId()); + + if (LOGGER.isInfoEnabled()) { + File file = path.toFile(); + LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which " + + (file.exists() ? "already exists" : "doesn't exist yet")); + } + // Write checkpoint file to disk + try { + byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry)); + Files.write(path, bytes); + } catch (IOException e) { + LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e); + throw HyracksDataException.create(e); + } + if (LOGGER.isInfoEnabled()) { + File file = path.toFile(); + LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now " + + (file.exists() ? "exists" : " still doesn't exist")); + } + } + + private List<File> getCheckpointFiles() { File[] checkpoints = checkpointDir.listFiles(filter); if (checkpoints == null || checkpoints.length == 0) { if (LOGGER.isInfoEnabled()) { LOGGER.log(Level.INFO, "Listing of files in the checkpoint dir returned " + (checkpoints == null ? "null" : "empty")); } - return null; + return Collections.emptyList(); } if (LOGGER.isInfoEnabled()) { LOGGER.log(Level.INFO, "Listing of files in the checkpoint dir returned " + Arrays.toString(checkpoints)); } + return Arrays.asList(checkpoints); + } + + private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) { List<Checkpoint> checkpointObjectList = new ArrayList<>(); for (File file : checkpoints) { try { @@ -134,106 +228,30 @@ } } } - /** - * If all checkpoint files are corrupted, we have no option but to try to perform recovery. - * We will forge a checkpoint that forces recovery to start from the beginning of the log. - * This shouldn't happen unless a hardware corruption happens. - */ - if (checkpointObjectList.isEmpty()) { - LOGGER.error("All checkpoint files are corrupted. Forcing recovery from the beginning of the log"); - checkpointObjectList.add(forgeForceRecoveryCheckpoint()); - } - - // Sort checkpointObjects in descending order by timeStamp to find out the most recent one. Collections.sort(checkpointObjectList); - - // Return the most recent one (the first one in sorted list) - return checkpointObjectList.get(0); - } - - @Override - public void start() { - checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency); - checkpointer.start(); - } - - @Override - public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { - checkpointer.shutdown(); - checkpointer.interrupt(); - try { - // Wait until checkpoint thread stops - checkpointer.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void dumpState(OutputStream os) throws IOException { - // Nothing to dump - } - - public Path getCheckpointPath(long checkpointTimestamp) { - return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX - + Long.toString(checkpointTimestamp)); - } - - protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException { - ILogManager logMgr = txnSubsystem.getLogManager(); - ITransactionManager txnMgr = txnSubsystem.getTransactionManager(); - Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxTxnId(), - System.currentTimeMillis(), sharp, StorageConstants.VERSION); - persist(checkpointObject); - cleanup(); - } - - protected Checkpoint forgeForceRecoveryCheckpoint() { - /** - * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from - * the first available log. - * We set the storage version to the current version. If there is a version mismatch, it will be detected - * during recovery. - */ - return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false, - StorageConstants.VERSION); - } - - private void persist(Checkpoint checkpoint) throws HyracksDataException { - // Get checkpoint file path - Path path = getCheckpointPath(checkpoint.getTimeStamp()); - - if (LOGGER.isInfoEnabled()) { - File file = path.toFile(); - LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which " - + (file.exists() ? "already exists" : "doesn't exist yet")); - } - // Write checkpoint file to disk - try { - byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry)); - Files.write(path, bytes); - } catch (IOException e) { - LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e); - throw HyracksDataException.create(e); - } - if (LOGGER.isInfoEnabled()) { - File file = path.toFile(); - LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now " - + (file.exists() ? "exists" : " still doesn't exist")); - } + return checkpointObjectList; } private void cleanup() { - File[] checkpointFiles = checkpointDir.listFiles(filter); - // Sort the filenames lexicographically to keep the latest checkpoint history files. - Arrays.sort(checkpointFiles); - for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Deleting checkpoint file at: " + checkpointFiles[i].getAbsolutePath()); - } - if (!checkpointFiles[i].delete() && LOGGER.isWarnEnabled()) { - LOGGER.warn("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath()); + final List<File> checkpointFiles = getCheckpointFiles(); + final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles); + final int deleteCount = orderedCheckpoints.size() - historyToKeep; + for (int i = 0; i < deleteCount; i++) { + final Checkpoint checkpoint = orderedCheckpoints.get(i); + final Path checkpointPath = getCheckpointPath(checkpoint.getId()); + LOGGER.warn("Deleting checkpoint file at: {}", checkpointPath); + if (!checkpointPath.toFile().delete()) { + LOGGER.warn("Could not delete checkpoint file at: {}", checkpointPath); } } } + + private long getNextCheckpointId() { + final Checkpoint latest = getLatest(); + if (latest == null) { + return FIRST_CHECKPOINT_ID; + } + return latest.getId() + 1; + } + } \ No newline at end of file diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index 6efd0e5..ce523db 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -38,7 +38,7 @@ public class CheckpointManager extends AbstractCheckpointManager { private static final Logger LOGGER = LogManager.getLogger(); - private static final long NO_SECURED_LSN = -1l; + private static final long NO_SECURED_LSN = -1L; private final Map<TxnId, Long> securedLSNs; public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java index 7ac49c6..dc59612 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java @@ -36,7 +36,7 @@ * Storage version #. Change this if you alter any tree frame formats to stop * possible corruption from old versions reading new formats. */ - public static final int VERSION = 6; + public static final int VERSION = 7; public static final int TUPLE_COUNT_OFFSET = 0; public static final int FREE_SPACE_OFFSET = TUPLE_COUNT_OFFSET + 4; public static final int LEVEL_OFFSET = FREE_SPACE_OFFSET + 4; -- To view, visit https://asterix-gerrit.ics.uci.edu/2961 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Idc061e6eaccfb308b29a5a263b77a0a849694d4f Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: stabilization-f69489 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
