Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1380
Change subject: Introduce CheckpointManager API
......................................................................
Introduce CheckpointManager API
This change includes the following:
- s/CheckpointObject/Checkpoint
- Add AsterixDB storage version to checkpoints.
- Prevent any txn log access when a storage version mismatch is detected.
- Introduce CheckpointManager API and CheckpointProperties.
- Properly stop checkpointing thread on instance shutdown.
- Separate checkpointing logic when replication enabled/disabled.
Change-Id: I36c00ca195b93bbe1e53f39bb4a3b5a344657f0d
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
R
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
A
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
A
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
A
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
A
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
20 files changed, 627 insertions(+), 285 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/80/1380/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index af50b71..0ba750b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -278,7 +278,7 @@
lccm.register((ILifeCycleComponent) datasetLifecycleManager);
lccm.register((ILifeCycleComponent)
txnSubsystem.getTransactionManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
-
+ lccm.register(txnSubsystem.getCheckpointManager());
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index ea1f714..feacb8a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.PrintUtil;
@@ -47,7 +48,6 @@
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import
org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
@@ -249,8 +249,8 @@
lccm.startAll();
if (!pendingFailbackCompletion) {
- IRecoveryManager recoveryMgr =
runtimeContext.getTransactionSubsystem().getRecoveryManager();
- recoveryMgr.checkpoint(true,
RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
+ ICheckpointManager checkpointyMgr =
runtimeContext.getTransactionSubsystem().getCheckpointManager();
+ checkpointyMgr.doSharpCheckpoint();
if (isMetadataNode) {
runtimeContext.exportMetadataNodeStub();
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 679d96e..5465973 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.configuration.Property;
import
org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.external.util.DataflowUtils;
@@ -125,6 +126,7 @@
FrameTupleAppender tupleAppender = new
FrameTupleAppender(frame);
IRecoveryManager recoveryManager =
nc.getTransactionSubsystem().getRecoveryManager();
+ ICheckpointManager checkpointManager =
nc.getTransactionSubsystem().getCheckpointManager();
LogManager logManager = (LogManager)
nc.getTransactionSubsystem().getLogManager();
// Number of log files after node startup should be one
int numberOfLogFiles = logManager.getLogFileIds().size();
@@ -154,7 +156,7 @@
* recovery)
*/
int numberOfLogFilesBeforeCheckpoint =
logManager.getLogFileIds().size();
- recoveryManager.checkpoint(false,
logManager.getAppendLSN());
+ checkpointManager.tryCheckpoint(logManager.getAppendLSN());
int numberOfLogFilesAfterCheckpoint =
logManager.getLogFileIds().size();
Assert.assertEquals(numberOfLogFilesBeforeCheckpoint,
numberOfLogFilesAfterCheckpoint);
@@ -175,7 +177,7 @@
* At this point, the low-water mark is not in the
initialLowWaterMarkFileId, so
* a checkpoint should delete it.
*/
- recoveryManager.checkpoint(false,
recoveryManager.getMinFirstLSN());
+
checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
// Validate initialLowWaterMarkFileId was deleted
for (Long fileId : logManager.getLogFileIds()) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
similarity index 76%
rename from
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java
rename to
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index 3356298..3375511 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.transaction.management.service.recovery;
+package org.apache.asterix.common.transactions;
import java.io.Serializable;
-public class CheckpointObject implements Serializable,
Comparable<CheckpointObject> {
+public class Checkpoint implements Serializable, Comparable<Checkpoint> {
private static final long serialVersionUID = 1L;
@@ -29,13 +29,16 @@
private final int maxJobId;
private final long timeStamp;
private final boolean sharp;
+ private final int storageVersion;
- public CheckpointObject(long checkpointLsn, long minMCTFirstLsn, int
maxJobId, long timeStamp, boolean sharp) {
+ public Checkpoint(long checkpointLsn, long minMCTFirstLsn, int maxJobId,
long timeStamp, boolean sharp,
+ int storageVersion) {
this.checkpointLsn = checkpointLsn;
this.minMCTFirstLsn = minMCTFirstLsn;
this.maxJobId = maxJobId;
this.timeStamp = timeStamp;
this.sharp = sharp;
+ this.storageVersion = storageVersion;
}
public long getCheckpointLsn() {
@@ -58,11 +61,15 @@
return sharp;
}
- @Override
- public int compareTo(CheckpointObject checkpointObject) {
- long compareTimeStamp = checkpointObject.getTimeStamp();
+ public int getStorageVersion() {
+ return storageVersion;
+ }
- //decending order
+ @Override
+ public int compareTo(Checkpoint checkpoint) {
+ long compareTimeStamp = checkpoint.getTimeStamp();
+
+ // Descending order
long diff = compareTimeStamp - this.timeStamp;
if (diff > 0) {
return 1;
@@ -71,8 +78,5 @@
} else {
return -1;
}
-
- //ascending order
- //return this.timeStamp - compareTimeStamp;
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
new file mode 100644
index 0000000..96c9d47
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+
+public class CheckpointProperties {
+
+ private final String checkpointDirPath;
+ private final int lsnThreshold;
+ private final int pollFrequency;
+ private final int historyToKeep;
+
+ public CheckpointProperties(AsterixTransactionProperties txnProperties,
String nodeId) {
+ // Currently we use the log files directory for checkpoints
+ checkpointDirPath = txnProperties.getLogDirectory(nodeId);
+ lsnThreshold = txnProperties.getCheckpointLSNThreshold();
+ pollFrequency = txnProperties.getCheckpointPollFrequency();
+ historyToKeep = txnProperties.getCheckpointHistory();
+ }
+
+ public int getLsnThreshold() {
+ return lsnThreshold;
+ }
+
+ public int getPollFrequency() {
+ return pollFrequency;
+ }
+
+ public int getHistoryToKeep() {
+ return historyToKeep;
+ }
+
+ public String getCheckpointDirPath() {
+ return checkpointDirPath;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
new file mode 100644
index 0000000..9e7eb0d
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+
+public interface ICheckpointManager extends ILifeCycleComponent {
+
+ /**
+ * @return The latest checkpoint on disk if any exists. Otherwise null.
+ * @throws ACIDException
+ * when a checkpoint file cannot be read.
+ */
+ Checkpoint getLatest() throws ACIDException;
+
+ /**
+ * Performs a sharp checkpoint.
+ *
+ * @throws HyracksDataException
+ */
+ void doSharpCheckpoint() throws HyracksDataException;
+
+ /**
+ * Attempts to perform a soft checkpoint at the specified {@code
checkpointTargetLSN}.
+ *
+ * @param checkpointTargetLSN
+ * @return The LSN recorded on the captured checkpoint.
+ * @throws HyracksDataException
+ */
+ long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index 97d4897..aa018ba 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -105,4 +105,9 @@
* @throws IOException
*/
public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel)
throws IOException;
+
+ /**
+ * Deletes all current log files and start the next log file partition
+ */
+ void renewLogFiles();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index a3115e7..6816116 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -82,20 +82,6 @@
public void rollbackTransaction(ITransactionContext txnContext) throws
ACIDException;
/**
- * Makes a system checkpoint.
- *
- * @param isSharpCheckpoint
- * a flag indicating whether to perform a sharp or non-sharp
checkpoint.
- * @param nonSharpCheckpointTargetLSN
- * if a non-sharp checkpoint to be performed, what is the
minimum LSN it should target.
- * @return the LSN at which the checkpoint was performed.
- * @throws ACIDException
- * @throws HyracksDataException
- */
- public long checkpoint(boolean isSharpCheckpoint, long
nonSharpCheckpointTargetLSN)
- throws ACIDException, HyracksDataException;
-
- /**
* @return min first LSN of the open indexes (including remote indexes if
replication is enabled)
* @throws HyracksDataException
*/
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index c81faf0..4a0d500 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -124,4 +124,10 @@
*/
public ITransactionSubsystem getTransactionProvider();
+ /**
+ *
+ * @return The current max job id.
+ */
+ int getMaxJobId();
+
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
index 4ce84b5..072efbb 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.common.transactions;
-
public interface ITransactionSubsystem {
public ILogManager getLogManager();
@@ -33,4 +32,5 @@
public String getId();
+ public ICheckpointManager getCheckpointManager();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 382c94b..a885e93 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,12 +18,19 @@
*/
package org.apache.asterix.common.utils;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+
/**
* A static class that stores storage constants
*/
public class StorageConstants {
public static final String METADATA_ROOT = "root_metadata";
+ /** The storage version of AsterixDB related artifacts (e.g. log files,
checkpoint files, etc..). */
+ private static final int LOCAL_STORAGE_VERSION = 1;
+
+ /** The storage version of AsterixDB stack. */
+ public static final int VERSION = LOCAL_STORAGE_VERSION +
ITreeIndexMetaDataFrame.VERSION;
private StorageConstants() {
}
-}
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 57d5c39..25096f6 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -351,7 +351,8 @@
return logFileSize * fileId + offset;
}
- public void renewLogFiles() throws IOException {
+ @Override
+ public void renewLogFiles() {
terminateLogFlusher();
long lastMaxLogFileId = deleteAllLogFiles();
initializeLogManager(lastMaxLogFileId + 1);
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
new file mode 100644
index 0000000..0b86ea5
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An abstract implementation of {@link ICheckpointManager}.
+ * The AbstractCheckpointManager contains the implementation of
+ * the base operations on checkpoints such as persisting and deleting them.
+ */
+public abstract class AbstractCheckpointManager implements ICheckpointManager {
+
+ private static final Logger LOGGER =
Logger.getLogger(AbstractCheckpointManager.class.getName());
+ private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
+ public static final long SHARP_CHECKPOINT_LSN = -1;
+ private static final FilenameFilter filter = (File dir, String name) ->
name.startsWith(CHECKPOINT_FILENAME_PREFIX);
+ private final File checkpointDir;
+ private final int historyToKeep;
+ private final int lsnThreshold;
+ private final int pollFrequency;
+ protected final ITransactionSubsystem txnSubsystem;
+ private CheckpointThread checkpointer;
+
+ public AbstractCheckpointManager(ITransactionSubsystem txnSubsystem,
CheckpointProperties checkpointProperties) {
+ this.txnSubsystem = txnSubsystem;
+ String checkpointDirPath = checkpointProperties.getCheckpointDirPath();
+ if (!checkpointDirPath.endsWith(File.separator)) {
+ checkpointDirPath += File.separator;
+ }
+ checkpointDir = new File(checkpointDirPath);
+ // Create the checkpoint directory if missing
+ if (!checkpointDir.exists()) {
+ (new File(checkpointDirPath)).mkdir();
+ }
+ lsnThreshold = checkpointProperties.getLsnThreshold();
+ pollFrequency = checkpointProperties.getPollFrequency();
+ // We must keep at least the latest checkpoint
+ historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 :
checkpointProperties.getHistoryToKeep();
+ }
+
+ @Override
+ public Checkpoint getLatest() throws ACIDException {
+ // Read all checkpointObjects from the existing checkpoint files
+ File[] checkpoints = checkpointDir.listFiles(filter);
+ if (checkpoints == null || checkpoints.length == 0) {
+ return null;
+ }
+
+ Checkpoint checkpointObject;
+ List<Checkpoint> checkpointObjectList = new ArrayList<>();
+ for (File file : checkpoints) {
+ try (FileInputStream fis = new FileInputStream(file);
+ ObjectInputStream oisFromFis = new ObjectInputStream(fis))
{
+ checkpointObject = (Checkpoint) oisFromFis.readObject();
+ checkpointObjectList.add(checkpointObject);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new ACIDException("Failed to read a checkpoint file", e);
+ }
+ }
+ // Sort checkpointObjects in descending order by timeStamp to find out
the most recent one.
+ Collections.sort(checkpointObjectList);
+
+ // Return the most recent one (the first one in sorted list)
+ return checkpointObjectList.get(0);
+ }
+
+ @Override
+ public void start() {
+ checkpointer = new CheckpointThread(this,
txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
+ checkpointer.start();
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream ouputStream) throws
IOException {
+ checkpointer.shutdown();
+ checkpointer.interrupt();
+ try {
+ // Wait until checkpoint thread stops
+ checkpointer.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ // Nothing to dump
+ }
+
+ protected void capture(long minMCTFirstLSN, boolean sharp) throws
HyracksDataException {
+ ILogManager logMgr = txnSubsystem.getLogManager();
+ ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
+ Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(),
minMCTFirstLSN, txnMgr.getMaxJobId(),
+ System.currentTimeMillis(), sharp, StorageConstants.VERSION);
+ persist(checkpointObject);
+ cleanup();
+ }
+
+ private void persist(Checkpoint checkpoint) throws HyracksDataException {
+ // Construct checkpoint file name
+ String fileName = checkpointDir.getAbsolutePath() + File.separator +
CHECKPOINT_FILENAME_PREFIX
+ + Long.toString(checkpoint.getTimeStamp());
+ //TODO: replace java serialization
+ // Write checkpoint file to disk
+ try (FileOutputStream fos = new FileOutputStream(fileName);
+ ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
+ oosToFos.writeObject(checkpoint);
+ oosToFos.flush();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write checkpoint to
disk", e);
+ }
+ }
+
+ private void cleanup() {
+ File[] checkpointFiles = checkpointDir.listFiles(filter);
+ // Sort the filenames lexicographically to keep the latest checkpoint
history files.
+ Arrays.sort(checkpointFiles);
+ for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) {
+ if (!checkpointFiles[i].delete()) {
+ LOGGER.warning("Could not delete checkpoint file at: " +
checkpointFiles[i].getAbsolutePath());
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
new file mode 100644
index 0000000..ea711a5
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of {@link ICheckpointManager} that defines the logic
+ * of checkpoints.
+ */
+public class CheckpointManager extends AbstractCheckpointManager {
+
+ private static final Logger LOGGER =
Logger.getLogger(CheckpointManager.class.getName());
+
+ public CheckpointManager(ITransactionSubsystem txnSubsystem,
CheckpointProperties checkpointProperties) {
+ super(txnSubsystem, checkpointProperties);
+ }
+
+ /**
+ * Performs a sharp checkpoint. All datasets are flushed and all
transaction
+ * log files are deleted.
+ */
+ @Override
+ public synchronized void doSharpCheckpoint() throws HyracksDataException {
+ LOGGER.info("Starting sharp checkpoint...");
+ final IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ datasetLifecycleManager.flushAllDatasets();
+ capture(SHARP_CHECKPOINT_LSN, true);
+ txnSubsystem.getLogManager().renewLogFiles();
+ LOGGER.info("Completed sharp checkpoint.");
+ }
+
+ /***
+ * Attempts to perform a soft checkpoint at the specified {@code
checkpointTargetLSN}.
+ * If a checkpoint cannot be captured due to datasets having LSN < {@code
checkpointTargetLSN},
+ * an asynchronous flush is triggered on them. When a checkpoint is
successful, all transaction
+ * log files that end with LSN < {@code checkpointTargetLSN} are deleted.
+ */
+ @Override
+ public synchronized long tryCheckpoint(long checkpointTargetLSN) throws
HyracksDataException {
+ LOGGER.info("Attemping soft checkpoint...");
+ final long minFirstLSN =
txnSubsystem.getRecoveryManager().getMinFirstLSN();
+ boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
+ if (!checkpointSucceeded) {
+ // Flush datasets with indexes behind target checkpoint LSN
+ IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+
datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
+ }
+ capture(minFirstLSN, false);
+ if (checkpointSucceeded) {
+ txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+ LOGGER.info(String.format("soft checkpoint succeeded at LSN(%s)",
minFirstLSN));
+ }
+ return minFirstLSN;
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
new file mode 100644
index 0000000..e97ffec
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
@@ -0,0 +1,21 @@
+package org.apache.asterix.transaction.management.service.recovery;
+
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+
+public class CheckpointManagerFactory {
+
+ private CheckpointManagerFactory() {
+ throw new AssertionError();
+ }
+
+ public static ICheckpointManager create(ITransactionSubsystem txnSubsystem,
+ CheckpointProperties checkpointProperties, boolean
replicationEnabled) {
+ if (!replicationEnabled) {
+ return new CheckpointManager(txnSubsystem, checkpointProperties);
+ } else {
+ return new ReplicationCheckpointManager(txnSubsystem,
checkpointProperties);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 851289e..c044890 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -18,15 +18,18 @@
*/
package org.apache.asterix.transaction.management.service.recovery;
-import java.io.IOError;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+/**
+ * A demon thread that periodically attempts to perform checkpoints.
+ * A checkpoint attempt is made when the volume of transaction logs written
+ * since the last successful checkpoint exceeds a certain threshold.
+ */
public class CheckpointThread extends Thread {
private static final Logger LOGGER =
Logger.getLogger(CheckpointThread.class.getName());
@@ -34,33 +37,35 @@
private long checkpointTermInSecs;
private final ILogManager logManager;
- private final IRecoveryManager recoveryMgr;
+ private final ICheckpointManager checkpointManager;
+ volatile boolean shouldRun = true;
- public CheckpointThread(IRecoveryManager recoveryMgr, ILogManager
logManager,
- long lsnThreshold, long checkpointTermInSecs) {
- this.recoveryMgr = recoveryMgr;
+ public CheckpointThread(ICheckpointManager checkpointManager, ILogManager
logManager, long lsnThreshold,
+ long checkpointTermInSecs) {
+ this.checkpointManager = checkpointManager;
this.logManager = logManager;
this.lsnThreshold = lsnThreshold;
this.checkpointTermInSecs = checkpointTermInSecs;
+ setDaemon(true);
}
@Override
public void run() {
-
Thread.currentThread().setName("Checkpoint Thread");
-
long currentCheckpointAttemptMinLSN;
long lastCheckpointLSN = -1;
long currentLogLSN;
long targetCheckpointLSN;
- while (true) {
+ while (shouldRun) {
try {
- sleep(checkpointTermInSecs * 1000);
+ sleep(checkpointTermInSecs * 10000000);
} catch (InterruptedException e) {
+ System.out.println("interrupped");
Thread.currentThread().interrupt();
- //ignore
}
-
+ if (!shouldRun) {
+ return;
+ }
if (lastCheckpointLSN == -1) {
try {
//Since the system just started up after sharp checkpoint,
@@ -84,18 +89,20 @@
//3. next time checkpoint comes, it will be able to remove
log files which have end range less than current targetCheckpointLSN
targetCheckpointLSN = lastCheckpointLSN + lsnThreshold;
- currentCheckpointAttemptMinLSN =
recoveryMgr.checkpoint(false, targetCheckpointLSN);
+ currentCheckpointAttemptMinLSN =
checkpointManager.tryCheckpoint(targetCheckpointLSN);
//checkpoint was completed at target LSN or above
if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN)
{
lastCheckpointLSN = currentCheckpointAttemptMinLSN;
}
-
- } catch (ACIDException | HyracksDataException e) {
- throw new IOError(e);
+ } catch (HyracksDataException e) {
+ LOGGER.log(Level.SEVERE, "Error during checkpoint", e);
}
}
}
}
+ public void shutdown() {
+ shouldRun = false;
+ }
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 1f8d698..cb19a2c 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -20,13 +20,9 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -34,7 +30,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -48,15 +43,13 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
-import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.Checkpoint;
import
org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ILogReader;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -66,7 +59,6 @@
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import
org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
-import
org.apache.asterix.transaction.management.service.transaction.TransactionManager;
import
org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -89,30 +81,22 @@
private static final Logger LOGGER =
Logger.getLogger(RecoveryManager.class.getName());
private final TransactionSubsystem txnSubsystem;
private final LogManager logMgr;
- private final int checkpointHistory;
- private final long SHARP_CHECKPOINT_LSN = -1;
private final boolean replicationEnabled;
- public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
private final long cachedEntityCommitsPerJobSize;
private final PersistentLocalResourceRepository localResourceRepository;
-
- /**
- * A file at a known location that contains the LSN of the last log record
- * traversed doing a successful checkpoint.
- */
- private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
+ private final ICheckpointManager checkpointManager;
private SystemState state;
public RecoveryManager(TransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
logMgr = (LogManager) txnSubsystem.getLogManager();
- checkpointHistory =
txnSubsystem.getTransactionProperties().getCheckpointHistory();
replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
localResourceRepository = (PersistentLocalResourceRepository)
txnSubsystem.getAsterixAppRuntimeContextProvider()
.getLocalResourceRepository();
cachedEntityCommitsPerJobSize =
txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
+ checkpointManager = txnSubsystem.getCheckpointManager();
}
/**
@@ -126,10 +110,8 @@
@Override
public SystemState getSystemState() throws ACIDException {
//read checkpoint file
- CheckpointObject checkpointObject = null;
- try {
- checkpointObject = readCheckpoint();
- } catch (FileNotFoundException e) {
+ Checkpoint checkpointObject = checkpointManager.getLatest();
+ if (checkpointObject == null) {
//The checkpoint file doesn't exist => Failure happened during NC
initialization.
//Retry to initialize the NC by setting the state to NEW_UNIVERSE
state = SystemState.NEW_UNIVERSE;
@@ -140,7 +122,7 @@
}
if (replicationEnabled) {
- if (checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
+ if (checkpointObject.getMinMCTFirstLsn() ==
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
//no logs exist
state = SystemState.HEALTHY;
return state;
@@ -156,14 +138,14 @@
} else {
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
if (logMgr.getAppendLSN() == readableSmallestLSN) {
- if (checkpointObject.getMinMCTFirstLsn() !=
SHARP_CHECKPOINT_LSN) {
+ if (checkpointObject.getMinMCTFirstLsn() !=
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
LOGGER.warning("Some(or all) of transaction log files are
lost.");
//No choice but continuing when the log files are lost.
}
state = SystemState.HEALTHY;
return state;
} else if (checkpointObject.getCheckpointLsn() ==
logMgr.getAppendLSN()
- && checkpointObject.getMinMCTFirstLsn() ==
SHARP_CHECKPOINT_LSN) {
+ && checkpointObject.getMinMCTFirstLsn() ==
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
state = SystemState.HEALTHY;
return state;
} else {
@@ -180,7 +162,7 @@
LOGGER.log(Level.INFO, "starting recovery ...");
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
- CheckpointObject checkpointObject = readCheckpoint();
+ Checkpoint checkpointObject = checkpointManager.getLatest();
long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
if (lowWaterMarkLSN < readableSmallestLSN) {
lowWaterMarkLSN = readableSmallestLSN;
@@ -372,8 +354,8 @@
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = index;
try {
- maxDiskLastLsn =
-
((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+ maxDiskLastLsn =
((AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback())
.getComponentLSN(lsmIndex.getImmutableComponents());
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
@@ -422,129 +404,6 @@
}
@Override
- public synchronized long checkpoint(boolean isSharpCheckpoint, long
nonSharpCheckpointTargetLSN)
- throws ACIDException, HyracksDataException {
- long minMCTFirstLSN;
- boolean nonSharpCheckpointSucceeded = false;
-
- if (isSharpCheckpoint) {
- LOGGER.log(Level.INFO, "Starting sharp checkpoint ... ");
- }
-
- TransactionManager txnMgr = (TransactionManager)
txnSubsystem.getTransactionManager();
- String logDir = logMgr.getLogManagerProperties().getLogDir();
-
- //get the filename of the previous checkpoint files which are about to
be deleted
- //right after the new checkpoint file is written.
- File[] prevCheckpointFiles = getPreviousCheckpointFiles();
-
- IDatasetLifecycleManager datasetLifecycleManager =
-
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
- //flush all in-memory components if it is the sharp checkpoint
- if (isSharpCheckpoint) {
- datasetLifecycleManager.flushAllDatasets();
- if (!replicationEnabled) {
- minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
- } else {
- //if is shutting down, need to check if we need to keep any
remote logs for dead replicas
- if
(txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown())
{
- Set<String> deadReplicaIds =
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
- .getReplicationManager().getDeadReplicasIds();
- if (deadReplicaIds.isEmpty()) {
- minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
- } else {
- //get min LSN of dead replicas remote resources
- IReplicaResourcesManager remoteResourcesManager =
txnSubsystem
-
.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
- IAsterixPropertiesProvider propertiesProvider =
(IAsterixPropertiesProvider) txnSubsystem
-
.getAsterixAppRuntimeContextProvider().getAppContext();
- AsterixMetadataProperties metadataProperties =
propertiesProvider.getMetadataProperties();
- Set<Integer> deadReplicasPartitions = new HashSet<>();
- //get partitions of the dead replicas that are not
active on this node
- for (String deadReplicaId : deadReplicaIds) {
- ClusterPartition[] nodePartitons =
-
metadataProperties.getNodePartitions().get(deadReplicaId);
- for (ClusterPartition partition : nodePartitons) {
- if
(!localResourceRepository.getActivePartitions()
- .contains(partition.getPartitionId()))
{
-
deadReplicasPartitions.add(partition.getPartitionId());
- }
- }
- }
- minMCTFirstLSN =
remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
- }
- } else {
- //start up complete checkpoint. Avoid deleting remote
recovery logs.
- minMCTFirstLSN = getMinFirstLSN();
- }
- }
- } else {
- minMCTFirstLSN = getMinFirstLSN();
- if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
- nonSharpCheckpointSucceeded = true;
- } else {
- //flush datasets with indexes behind target checkpoint LSN
-
datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
- if (replicationEnabled) {
- //request remote replicas to flush lagging indexes
- IReplicationManager replicationManager =
-
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
- try {
-
replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- CheckpointObject checkpointObject = new
CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
- txnMgr.getMaxJobId(), System.currentTimeMillis(),
isSharpCheckpoint);
-
- String fileName = getCheckpointFileName(logDir,
Long.toString(checkpointObject.getTimeStamp()));
-
- try (FileOutputStream fos = new FileOutputStream(fileName);
- ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
- oosToFos.writeObject(checkpointObject);
- oosToFos.flush();
- } catch (IOException e) {
- throw new ACIDException("Failed to checkpoint", e);
- }
-
- //#. delete the previous checkpoint files
- if (prevCheckpointFiles != null) {
- // sort the filenames lexicographically to keep the latest
checkpointHistory files.
- Arrays.sort(prevCheckpointFiles);
- for (int i = 0; i < prevCheckpointFiles.length -
this.checkpointHistory; ++i) {
- prevCheckpointFiles[i].delete();
- }
- }
-
- if (isSharpCheckpoint) {
- try {
- if (minMCTFirstLSN == SHARP_CHECKPOINT_LSN) {
- logMgr.renewLogFiles();
- } else {
- logMgr.deleteOldLogFiles(minMCTFirstLSN);
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- if (nonSharpCheckpointSucceeded) {
- logMgr.deleteOldLogFiles(minMCTFirstLSN);
- }
-
- if (isSharpCheckpoint) {
- LOGGER.info("Completed sharp checkpoint.");
- }
-
- //return the min LSN that was recorded in the checkpoint
- return minMCTFirstLSN;
- }
-
- @Override
public long getMinFirstLSN() throws HyracksDataException {
long minFirstLSN = getLocalMinFirstLSN();
@@ -559,16 +418,16 @@
@Override
public long getLocalMinFirstLSN() throws HyracksDataException {
- IDatasetLifecycleManager datasetLifecycleManager =
-
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
List<IIndex> openIndexList =
datasetLifecycleManager.getOpenResources();
long firstLSN;
//the min first lsn can only be the current append or smaller
long minFirstLSN = logMgr.getAppendLSN();
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- AbstractLSMIOOperationCallback ioCallback =
- (AbstractLSMIOOperationCallback) ((ILSMIndex)
index).getIOOperationCallback();
+ AbstractLSMIOOperationCallback ioCallback =
(AbstractLSMIOOperationCallback) ((ILSMIndex) index)
+ .getIOOperationCallback();
if (!((AbstractLSMIndex)
index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
firstLSN = ioCallback.getFirstLSN();
@@ -580,58 +439,10 @@
}
private long getRemoteMinFirstLSN() {
- IReplicaResourcesManager remoteResourcesManager =
-
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
+ IReplicaResourcesManager remoteResourcesManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getAppContext().getReplicaResourcesManager();
long minRemoteLSN =
remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
return minRemoteLSN;
- }
-
- private CheckpointObject readCheckpoint() throws ACIDException,
FileNotFoundException {
- CheckpointObject checkpointObject = null;
-
- //read all checkpointObjects from the existing checkpoint files
- File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) {
- throw new FileNotFoundException("Checkpoint file is not found");
- }
-
- List<CheckpointObject> checkpointObjectList = new ArrayList<>();
- for (File file : prevCheckpointFiles) {
- try (FileInputStream fis = new FileInputStream(file);
- ObjectInputStream oisFromFis = new ObjectInputStream(fis))
{
- checkpointObject = (CheckpointObject) oisFromFis.readObject();
- checkpointObjectList.add(checkpointObject);
- } catch (Exception e) {
- throw new ACIDException("Failed to read a checkpoint file", e);
- }
- }
-
- //sort checkpointObjects in descending order by timeStamp to find out
the most recent one.
- Collections.sort(checkpointObjectList);
-
- //return the most recent one (the first one in sorted list)
- return checkpointObjectList.get(0);
- }
-
- private File[] getPreviousCheckpointFiles() {
- String logDir = ((LogManager)
txnSubsystem.getLogManager()).getLogManagerProperties().getLogDir();
- File parentDir = new File(logDir);
-
- FilenameFilter filter = new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.contains(CHECKPOINT_FILENAME_PREFIX);
- }
- };
-
- return parentDir.listFiles(filter);
- }
-
- private static String getCheckpointFileName(String baseDir, String suffix)
{
- if (!baseDir.endsWith(System.getProperty("file.separator"))) {
- baseDir += System.getProperty("file.separator");
- }
- return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
}
@Override
@@ -794,8 +605,8 @@
//undo loserTxn's effect
LOGGER.log(Level.INFO, "undoing loser transaction's effect");
- IDatasetLifecycleManager datasetLifecycleManager =
-
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
//TODO sort loser entities by smallest LSN to undo in one pass.
Iterator<Entry<TxnId, List<Long>>> iter =
jobLoserEntity2LSNsMap.entrySet().iterator();
int undoCount = 0;
@@ -836,12 +647,8 @@
@Override
public void stop(boolean dumpState, OutputStream os) throws IOException {
- try {
- checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
- } catch (HyracksDataException | ACIDException e) {
- e.printStackTrace();
- throw new IOException(e);
- }
+ // Shutdown checkpoint
+ checkpointManager.doSharpCheckpoint();
}
@Override
@@ -851,10 +658,10 @@
private static void undo(ILogRecord logRecord, IDatasetLifecycleManager
datasetLifecycleManager) {
try {
- ILSMIndex index =
- (ILSMIndex)
datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
logRecord.getResourceId());
- ILSMIndexAccessor indexAccessor =
- index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
+ ILSMIndex index = (ILSMIndex)
datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+ logRecord.getResourceId());
+ ILSMIndexAccessor indexAccessor =
index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
indexAccessor.forceDelete(logRecord.getNewValue());
} else if (logRecord.getNewOp() ==
IndexOperation.DELETE.ordinal()) {
@@ -871,10 +678,9 @@
try {
int datasetId = logRecord.getDatasetId();
long resourceId = logRecord.getResourceId();
- ILSMIndex index =
- (ILSMIndex) datasetLifecycleManager.getIndex(datasetId,
resourceId);
- ILSMIndexAccessor indexAccessor =
- index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
+ ILSMIndex index = (ILSMIndex)
datasetLifecycleManager.getIndex(datasetId, resourceId);
+ ILSMIndexAccessor indexAccessor =
index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
indexAccessor.forceInsert(logRecord.getNewValue());
} else if (logRecord.getNewOp() ==
IndexOperation.DELETE.ordinal()) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
new file mode 100644
index 0000000..1e67599
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of {@link ICheckpointManager} that defines the logic
+ * of checkpoints when replication is enabled..
+ */
+public class ReplicationCheckpointManager extends AbstractCheckpointManager {
+
+ private static final Logger LOGGER =
Logger.getLogger(ReplicationCheckpointManager.class.getName());
+
+ public ReplicationCheckpointManager(ITransactionSubsystem txnSubsystem,
CheckpointProperties checkpointProperties) {
+ super(txnSubsystem, checkpointProperties);
+ }
+
+ /**
+ * Performs a sharp checkpoint. All datasets are flushed and all
transaction
+ * log files are deleted except the files that are needed for dead
replicas.
+ */
+ @Override
+ public synchronized void doSharpCheckpoint() throws HyracksDataException {
+ LOGGER.info("Starting sharp checkpoint...");
+ final IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ datasetLifecycleManager.flushAllDatasets();
+ long minFirstLSN;
+ // If shutting down, need to check if we need to keep any remote logs
for dead replicas
+ if
(txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown())
{
+ final Set<String> deadReplicaIds =
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
+ .getReplicationManager().getDeadReplicasIds();
+ if (deadReplicaIds.isEmpty()) {
+ // No dead replicas => no need to keep any log
+ minFirstLSN = SHARP_CHECKPOINT_LSN;
+ } else {
+ // Get min LSN of dead replicas remote resources
+ minFirstLSN = getDeadReplicasMinFirstLSN(deadReplicaIds);
+ }
+ } else {
+ // Start up complete checkpoint. Avoid deleting remote recovery
logs.
+ minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+ }
+ capture(minFirstLSN, true);
+ if (minFirstLSN == SHARP_CHECKPOINT_LSN) {
+ // No need to keep any logs
+ txnSubsystem.getLogManager().renewLogFiles();
+ } else {
+ // Delete only log files with LSNs < any dead replica partition
minimum LSN
+ txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+ }
+ LOGGER.info("Completed sharp checkpoint.");
+ }
+
+ /***
+ * Attempts to perform a soft checkpoint at the specified {@code
checkpointTargetLSN}.
+ * If a checkpoint cannot be captured due to datasets having LSN < {@code
checkpointTargetLSN},
+ * an asynchronous flush is triggered on them. If the checkpoint fails due
to a replica index,
+ * a request is sent to the primary replica of the index to flush it.
+ * When a checkpoint is successful, all transaction log files that end with
+ * LSN < {@code checkpointTargetLSN} are deleted.
+ */
+ @Override
+ public synchronized long tryCheckpoint(long checkpointTargetLSN) throws
HyracksDataException {
+ LOGGER.info("Attemping soft checkpoint...");
+ final long minFirstLSN =
txnSubsystem.getRecoveryManager().getMinFirstLSN();
+ boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
+ if (!checkpointSucceeded) {
+ // Flush datasets with indexes behind target checkpoint LSN
+ final IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+
datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
+ // Request remote replicas to flush lagging indexes
+ final IReplicationManager replicationManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getAppContext().getReplicationManager();
+ try {
+
replicationManager.requestFlushLaggingReplicaIndexes(checkpointTargetLSN);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ capture(minFirstLSN, false);
+ if (checkpointSucceeded) {
+ txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+ LOGGER.info(String.format("soft checkpoint succeeded with at
LSN(%s)", minFirstLSN));
+ }
+ return minFirstLSN;
+ }
+
+ private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) {
+ final IReplicaResourcesManager remoteResourcesManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getAppContext().getReplicaResourcesManager();
+ final IAsterixPropertiesProvider propertiesProvider =
(IAsterixPropertiesProvider) txnSubsystem
+ .getAsterixAppRuntimeContextProvider().getAppContext();
+ final AsterixMetadataProperties metadataProperties =
propertiesProvider.getMetadataProperties();
+ final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) txnSubsystem
+
.getAsterixAppRuntimeContextProvider().getLocalResourceRepository();
+ // Get partitions of the dead replicas that are not active on this node
+ final Set<Integer> deadReplicasPartitions = new HashSet<>();
+ for (String deadReplicaId : deadReplicaIds) {
+ final ClusterPartition[] nodePartitons =
metadataProperties.getNodePartitions().get(deadReplicaId);
+ for (ClusterPartition partition : nodePartitons) {
+ if
(!localResourceRepository.getActivePartitions().contains(partition.getPartitionId()))
{
+ deadReplicasPartitions.add(partition.getPartitionId());
+ }
+ }
+ }
+ return
remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index f035029..b08ecbb 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -145,6 +145,7 @@
}
}
+ @Override
public int getMaxJobId() {
return maxJobId.get();
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 5f06b8a..804373a 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -27,16 +27,20 @@
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.CheckpointProperties;
import
org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ILockManager;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.utils.StorageConstants;
import
org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import
org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
-import
org.apache.asterix.transaction.management.service.recovery.CheckpointThread;
+import
org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
import
org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
/**
@@ -50,8 +54,8 @@
private final ITransactionManager transactionManager;
private final IRecoveryManager recoveryManager;
private final IAsterixAppRuntimeContextProvider
asterixAppRuntimeContextProvider;
- private final CheckpointThread checkpointThread;
private final AsterixTransactionProperties txnProperties;
+ private final ICheckpointManager checkpointManager;
//for profiling purpose
public static final boolean IS_PROFILE_MODE = false;//true
@@ -66,6 +70,15 @@
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
this.lockManager = new
ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
+ final boolean replicationEnabled =
ClusterProperties.INSTANCE.isReplicationEnabled();
+ final CheckpointProperties checkpointProperties = new
CheckpointProperties(txnProperties, id);
+ checkpointManager = CheckpointManagerFactory.create(this,
checkpointProperties, replicationEnabled);
+ final Checkpoint latestCheckpoint = checkpointManager.getLatest();
+ if (latestCheckpoint != null && latestCheckpoint.getStorageVersion()
!= StorageConstants.VERSION) {
+ throw new IllegalStateException(
+ String.format("Storage version mismatch. Current version
(%s). On disk version: (%s)",
+ latestCheckpoint.getStorageVersion(),
StorageConstants.VERSION));
+ }
AsterixReplicationProperties asterixReplicationProperties = null;
if (asterixAppRuntimeContextProvider != null) {
@@ -73,21 +86,12 @@
.getAppContext()).getReplicationProperties();
}
- if (asterixReplicationProperties != null &&
ClusterProperties.INSTANCE.isReplicationEnabled()) {
+ if (asterixReplicationProperties != null && replicationEnabled) {
this.logManager = new LogManagerWithReplication(this);
} else {
this.logManager = new LogManager(this);
}
-
this.recoveryManager = new RecoveryManager(this);
-
- if (asterixAppRuntimeContextProvider != null) {
- this.checkpointThread = new CheckpointThread(recoveryManager,
logManager,
- this.txnProperties.getCheckpointLSNThreshold(),
this.txnProperties.getCheckpointPollFrequency());
- this.checkpointThread.start();
- } else {
- this.checkpointThread = null;
- }
if (IS_PROFILE_MODE) {
ecp = new EntityCommitProfiler(this,
this.txnProperties.getCommitProfilerReportInterval());
@@ -133,6 +137,11 @@
++profilerEntityCommitLogCount;
}
+ @Override
+ public ICheckpointManager getCheckpointManager() {
+ return checkpointManager;
+ }
+
/**
* Thread for profiling entity level commit count
* This thread takes a report interval (in seconds) parameter and
--
To view, visit https://asterix-gerrit.ics.uci.edu/1380
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I36c00ca195b93bbe1e53f39bb4a3b5a344657f0d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>