Repository: asterixdb Updated Branches: refs/heads/master af7702128 -> aa00bf69d
Introduce CheckpointManager API This change includes the following: - s/CheckpointObject/Checkpoint - Add AsterixDB storage version to checkpoints. - Prevent any txn log access when a storage version mismatch is detected. - Introduce CheckpointManager API and CheckpointProperties. - Properly stop checkpointing thread on instance shutdown. - Separate checkpointing logic when replication enabled/disabled. Change-Id: I36c00ca195b93bbe1e53f39bb4a3b5a344657f0d Reviewed-on: https://asterix-gerrit.ics.uci.edu/1380 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <hubail...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/aa00bf69 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/aa00bf69 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/aa00bf69 Branch: refs/heads/master Commit: aa00bf69d78afa4c8227f66530f8d81384148f47 Parents: af77021 Author: Murtadha Hubail <mhub...@uci.edu> Authored: Sat Dec 17 10:54:46 2016 +0300 Committer: Murtadha Hubail <hubail...@gmail.com> Committed: Sat Dec 17 09:01:35 2016 -0800 ---------------------------------------------------------------------- .../asterix/app/nc/NCAppRuntimeContext.java | 2 +- .../bootstrap/NCApplicationEntryPoint.java | 6 +- .../asterix/test/logging/CheckpointingTest.java | 6 +- .../asterix/common/transactions/Checkpoint.java | 110 ++++++++ .../transactions/CheckpointProperties.java | 53 ++++ .../common/transactions/ICheckpointManager.java | 49 ++++ .../common/transactions/ILogManager.java | 5 + .../common/transactions/IRecoveryManager.java | 14 -- .../transactions/ITransactionManager.java | 5 + .../transactions/ITransactionSubsystem.java | 2 +- .../asterix/common/utils/StorageConstants.java | 9 +- .../management/service/logging/LogManager.java | 3 +- .../recovery/AbstractCheckpointManager.java | 163 ++++++++++++ .../service/recovery/CheckpointManager.java | 80 ++++++ .../recovery/CheckpointManagerFactory.java | 39 +++ .../service/recovery/CheckpointObject.java | 78 ------ .../service/recovery/CheckpointThread.java | 38 +-- .../service/recovery/RecoveryManager.java | 252 +++---------------- .../recovery/ReplicationCheckpointManager.java | 143 +++++++++++ .../service/transaction/TransactionManager.java | 1 + .../transaction/TransactionSubsystem.java | 37 +-- 21 files changed, 741 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index a3ae9a0..b1ca062 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -278,7 +278,7 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi lccm.register((ILifeCycleComponent) datasetLifecycleManager); lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager()); lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager()); - + lccm.register(txnSubsystem.getCheckpointManager()); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index 1887f2e..8998c6b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -37,6 +37,7 @@ import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.IPropertiesProvider; import org.apache.asterix.common.config.MessagingProperties; import org.apache.asterix.common.replication.IRemoteRecoveryManager; +import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.utils.PrintUtil; @@ -47,7 +48,6 @@ import org.apache.asterix.messaging.MessagingChannelInterfaceFactory; import org.apache.asterix.messaging.NCMessageBroker; import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.asterix.transaction.management.service.recovery.RecoveryManager; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.application.INCApplicationEntryPoint; @@ -249,8 +249,8 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { lccm.startAll(); if (!pendingFailbackCompletion) { - IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); - recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN); + ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager(); + checkpointMgr.doSharpCheckpoint(); if (isMetadataNode) { runtimeContext.exportMetadataNodeStub(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index deb53ad..9fbf850 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -30,6 +30,7 @@ import org.apache.asterix.common.configuration.AsterixConfiguration; import org.apache.asterix.common.configuration.Property; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; import org.apache.asterix.common.transactions.DatasetId; +import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.external.util.DataflowUtils; @@ -125,6 +126,7 @@ public class CheckpointingTest { FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager(); + ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager(); LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager(); // Number of log files after node startup should be one int numberOfLogFiles = logManager.getLogFileIds().size(); @@ -154,7 +156,7 @@ public class CheckpointingTest { * recovery) */ int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size(); - recoveryManager.checkpoint(false, logManager.getAppendLSN()); + checkpointManager.tryCheckpoint(logManager.getAppendLSN()); int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size(); Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint); @@ -175,7 +177,7 @@ public class CheckpointingTest { * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so * a checkpoint should delete it. */ - recoveryManager.checkpoint(false, recoveryManager.getMinFirstLSN()); + checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN()); // Validate initialLowWaterMarkFileId was deleted for (Long fileId : logManager.getLogFileIds()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java new file mode 100644 index 0000000..8bbdab7 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.transactions; + +import java.io.Serializable; + +public class Checkpoint implements Serializable, Comparable<Checkpoint> { + + private static final long serialVersionUID = 1L; + + private final long checkpointLsn; + private final long minMCTFirstLsn; + private final int maxJobId; + private final long timeStamp; + private final boolean sharp; + private final int storageVersion; + + public Checkpoint(long checkpointLsn, long minMCTFirstLsn, int maxJobId, long timeStamp, boolean sharp, + int storageVersion) { + this.checkpointLsn = checkpointLsn; + this.minMCTFirstLsn = minMCTFirstLsn; + this.maxJobId = maxJobId; + this.timeStamp = timeStamp; + this.sharp = sharp; + this.storageVersion = storageVersion; + } + + public long getCheckpointLsn() { + return checkpointLsn; + } + + public long getMinMCTFirstLsn() { + return minMCTFirstLsn; + } + + public int getMaxJobId() { + return maxJobId; + } + + public long getTimeStamp() { + return timeStamp; + } + + public boolean isSharp() { + return sharp; + } + + public int getStorageVersion() { + return storageVersion; + } + + @Override + public int compareTo(Checkpoint checkpoint) { + long compareTimeStamp = checkpoint.getTimeStamp(); + + // Descending order + long diff = compareTimeStamp - this.timeStamp; + if (diff > 0) { + return 1; + } else if (diff == 0) { + return 0; + } else { + return -1; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof Checkpoint)) { + return false; + } + Checkpoint other = (Checkpoint) obj; + return compareTo(other) == 0; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32)); + result = prime * result + maxJobId; + result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32)); + result = prime * result + (sharp ? 1231 : 1237); + result = prime * result + storageVersion; + result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32)); + return result; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java new file mode 100644 index 0000000..b8af3a6 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.transactions; + +import org.apache.asterix.common.config.TransactionProperties; + +public class CheckpointProperties { + + private final String checkpointDirPath; + private final int lsnThreshold; + private final int pollFrequency; + private final int historyToKeep; + + public CheckpointProperties(TransactionProperties txnProperties, String nodeId) { + // Currently we use the log files directory for checkpoints + checkpointDirPath = txnProperties.getLogDirectory(nodeId); + lsnThreshold = txnProperties.getCheckpointLSNThreshold(); + pollFrequency = txnProperties.getCheckpointPollFrequency(); + historyToKeep = txnProperties.getCheckpointHistory(); + } + + public int getLsnThreshold() { + return lsnThreshold; + } + + public int getPollFrequency() { + return pollFrequency; + } + + public int getHistoryToKeep() { + return historyToKeep; + } + + public String getCheckpointDirPath() { + return checkpointDirPath; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java new file mode 100644 index 0000000..9e7eb0d --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.transactions; + +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; + +public interface ICheckpointManager extends ILifeCycleComponent { + + /** + * @return The latest checkpoint on disk if any exists. Otherwise null. + * @throws ACIDException + * when a checkpoint file cannot be read. + */ + Checkpoint getLatest() throws ACIDException; + + /** + * Performs a sharp checkpoint. + * + * @throws HyracksDataException + */ + void doSharpCheckpoint() throws HyracksDataException; + + /** + * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}. + * + * @param checkpointTargetLSN + * @return The LSN recorded on the captured checkpoint. + * @throws HyracksDataException + */ + long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java index 97d4897..aa018ba 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java @@ -105,4 +105,9 @@ public interface ILogManager { * @throws IOException */ public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException; + + /** + * Deletes all current log files and start the next log file partition + */ + void renewLogFiles(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index a3115e7..6816116 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -82,20 +82,6 @@ public interface IRecoveryManager { public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException; /** - * Makes a system checkpoint. - * - * @param isSharpCheckpoint - * a flag indicating whether to perform a sharp or non-sharp checkpoint. - * @param nonSharpCheckpointTargetLSN - * if a non-sharp checkpoint to be performed, what is the minimum LSN it should target. - * @return the LSN at which the checkpoint was performed. - * @throws ACIDException - * @throws HyracksDataException - */ - public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) - throws ACIDException, HyracksDataException; - - /** * @return min first LSN of the open indexes (including remote indexes if replication is enabled) * @throws HyracksDataException */ http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java index c81faf0..af056ae 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java @@ -124,4 +124,9 @@ public interface ITransactionManager { */ public ITransactionSubsystem getTransactionProvider(); + /** + * @return The current max job id. + */ + int getMaxJobId(); + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java index dc1e6ed..b3a3eba 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.common.transactions; - public interface ITransactionSubsystem { public ILogManager getLogManager(); @@ -33,4 +32,5 @@ public interface ITransactionSubsystem { public String getId(); + public ICheckpointManager getCheckpointManager(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index 382c94b..a885e93 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -18,12 +18,19 @@ */ package org.apache.asterix.common.utils; +import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame; + /** * A static class that stores storage constants */ public class StorageConstants { public static final String METADATA_ROOT = "root_metadata"; + /** The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). */ + private static final int LOCAL_STORAGE_VERSION = 1; + + /** The storage version of AsterixDB stack. */ + public static final int VERSION = LOCAL_STORAGE_VERSION + ITreeIndexMetaDataFrame.VERSION; private StorageConstants() { } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 57d5c39..25096f6 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -351,7 +351,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent { return logFileSize * fileId + offset; } - public void renewLogFiles() throws IOException { + @Override + public void renewLogFiles() { terminateLogFlusher(); long lastMaxLogFileId = deleteAllLogFiles(); initializeLogManager(lastMaxLogFileId + 1); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java new file mode 100644 index 0000000..0b86ea5 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.recovery; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.Checkpoint; +import org.apache.asterix.common.transactions.CheckpointProperties; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * An abstract implementation of {@link ICheckpointManager}. + * The AbstractCheckpointManager contains the implementation of + * the base operations on checkpoints such as persisting and deleting them. + */ +public abstract class AbstractCheckpointManager implements ICheckpointManager { + + private static final Logger LOGGER = Logger.getLogger(AbstractCheckpointManager.class.getName()); + private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_"; + public static final long SHARP_CHECKPOINT_LSN = -1; + private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX); + private final File checkpointDir; + private final int historyToKeep; + private final int lsnThreshold; + private final int pollFrequency; + protected final ITransactionSubsystem txnSubsystem; + private CheckpointThread checkpointer; + + public AbstractCheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { + this.txnSubsystem = txnSubsystem; + String checkpointDirPath = checkpointProperties.getCheckpointDirPath(); + if (!checkpointDirPath.endsWith(File.separator)) { + checkpointDirPath += File.separator; + } + checkpointDir = new File(checkpointDirPath); + // Create the checkpoint directory if missing + if (!checkpointDir.exists()) { + (new File(checkpointDirPath)).mkdir(); + } + lsnThreshold = checkpointProperties.getLsnThreshold(); + pollFrequency = checkpointProperties.getPollFrequency(); + // We must keep at least the latest checkpoint + historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep(); + } + + @Override + public Checkpoint getLatest() throws ACIDException { + // Read all checkpointObjects from the existing checkpoint files + File[] checkpoints = checkpointDir.listFiles(filter); + if (checkpoints == null || checkpoints.length == 0) { + return null; + } + + Checkpoint checkpointObject; + List<Checkpoint> checkpointObjectList = new ArrayList<>(); + for (File file : checkpoints) { + try (FileInputStream fis = new FileInputStream(file); + ObjectInputStream oisFromFis = new ObjectInputStream(fis)) { + checkpointObject = (Checkpoint) oisFromFis.readObject(); + checkpointObjectList.add(checkpointObject); + } catch (IOException | ClassNotFoundException e) { + throw new ACIDException("Failed to read a checkpoint file", e); + } + } + // Sort checkpointObjects in descending order by timeStamp to find out the most recent one. + Collections.sort(checkpointObjectList); + + // Return the most recent one (the first one in sorted list) + return checkpointObjectList.get(0); + } + + @Override + public void start() { + checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency); + checkpointer.start(); + } + + @Override + public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { + checkpointer.shutdown(); + checkpointer.interrupt(); + try { + // Wait until checkpoint thread stops + checkpointer.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void dumpState(OutputStream os) throws IOException { + // Nothing to dump + } + + protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException { + ILogManager logMgr = txnSubsystem.getLogManager(); + ITransactionManager txnMgr = txnSubsystem.getTransactionManager(); + Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxJobId(), + System.currentTimeMillis(), sharp, StorageConstants.VERSION); + persist(checkpointObject); + cleanup(); + } + + private void persist(Checkpoint checkpoint) throws HyracksDataException { + // Construct checkpoint file name + String fileName = checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + + Long.toString(checkpoint.getTimeStamp()); + //TODO: replace java serialization + // Write checkpoint file to disk + try (FileOutputStream fos = new FileOutputStream(fileName); + ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { + oosToFos.writeObject(checkpoint); + oosToFos.flush(); + } catch (IOException e) { + throw new HyracksDataException("Failed to write checkpoint to disk", e); + } + } + + private void cleanup() { + File[] checkpointFiles = checkpointDir.listFiles(filter); + // Sort the filenames lexicographically to keep the latest checkpoint history files. + Arrays.sort(checkpointFiles); + for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) { + if (!checkpointFiles[i].delete()) { + LOGGER.warning("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath()); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java new file mode 100644 index 0000000..ea711a5 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.recovery; + +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.transactions.CheckpointProperties; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * An implementation of {@link ICheckpointManager} that defines the logic + * of checkpoints. + */ +public class CheckpointManager extends AbstractCheckpointManager { + + private static final Logger LOGGER = Logger.getLogger(CheckpointManager.class.getName()); + + public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { + super(txnSubsystem, checkpointProperties); + } + + /** + * Performs a sharp checkpoint. All datasets are flushed and all transaction + * log files are deleted. + */ + @Override + public synchronized void doSharpCheckpoint() throws HyracksDataException { + LOGGER.info("Starting sharp checkpoint..."); + final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); + datasetLifecycleManager.flushAllDatasets(); + capture(SHARP_CHECKPOINT_LSN, true); + txnSubsystem.getLogManager().renewLogFiles(); + LOGGER.info("Completed sharp checkpoint."); + } + + /*** + * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}. + * If a checkpoint cannot be captured due to datasets having LSN < {@code checkpointTargetLSN}, + * an asynchronous flush is triggered on them. When a checkpoint is successful, all transaction + * log files that end with LSN < {@code checkpointTargetLSN} are deleted. + */ + @Override + public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException { + LOGGER.info("Attemping soft checkpoint..."); + final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); + boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; + if (!checkpointSucceeded) { + // Flush datasets with indexes behind target checkpoint LSN + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); + datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN); + } + capture(minFirstLSN, false); + if (checkpointSucceeded) { + txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN); + LOGGER.info(String.format("soft checkpoint succeeded at LSN(%s)", minFirstLSN)); + } + return minFirstLSN; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java new file mode 100644 index 0000000..68c5ce1 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.recovery; + +import org.apache.asterix.common.transactions.CheckpointProperties; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.transactions.ITransactionSubsystem; + +public class CheckpointManagerFactory { + + private CheckpointManagerFactory() { + throw new AssertionError(); + } + + public static ICheckpointManager create(ITransactionSubsystem txnSubsystem, + CheckpointProperties checkpointProperties, boolean replicationEnabled) { + if (!replicationEnabled) { + return new CheckpointManager(txnSubsystem, checkpointProperties); + } else { + return new ReplicationCheckpointManager(txnSubsystem, checkpointProperties); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java deleted file mode 100644 index 3356298..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.recovery; - -import java.io.Serializable; - -public class CheckpointObject implements Serializable, Comparable<CheckpointObject> { - - private static final long serialVersionUID = 1L; - - private final long checkpointLsn; - private final long minMCTFirstLsn; - private final int maxJobId; - private final long timeStamp; - private final boolean sharp; - - public CheckpointObject(long checkpointLsn, long minMCTFirstLsn, int maxJobId, long timeStamp, boolean sharp) { - this.checkpointLsn = checkpointLsn; - this.minMCTFirstLsn = minMCTFirstLsn; - this.maxJobId = maxJobId; - this.timeStamp = timeStamp; - this.sharp = sharp; - } - - public long getCheckpointLsn() { - return checkpointLsn; - } - - public long getMinMCTFirstLsn() { - return minMCTFirstLsn; - } - - public int getMaxJobId() { - return maxJobId; - } - - public long getTimeStamp() { - return timeStamp; - } - - public boolean isSharp() { - return sharp; - } - - @Override - public int compareTo(CheckpointObject checkpointObject) { - long compareTimeStamp = checkpointObject.getTimeStamp(); - - //decending order - long diff = compareTimeStamp - this.timeStamp; - if (diff > 0) { - return 1; - } else if (diff == 0) { - return 0; - } else { - return -1; - } - - //ascending order - //return this.timeStamp - compareTimeStamp; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java index 851289e..39c7c98 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java @@ -18,15 +18,18 @@ */ package org.apache.asterix.transaction.management.service.recovery; -import java.io.IOError; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ILogManager; -import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.hyracks.api.exceptions.HyracksDataException; +/** + * A daemon thread that periodically attempts to perform checkpoints. + * A checkpoint attempt is made when the volume of transaction logs written + * since the last successful checkpoint exceeds a certain threshold. + */ public class CheckpointThread extends Thread { private static final Logger LOGGER = Logger.getLogger(CheckpointThread.class.getName()); @@ -34,33 +37,34 @@ public class CheckpointThread extends Thread { private long checkpointTermInSecs; private final ILogManager logManager; - private final IRecoveryManager recoveryMgr; + private final ICheckpointManager checkpointManager; + private volatile boolean shouldRun = true; - public CheckpointThread(IRecoveryManager recoveryMgr, ILogManager logManager, - long lsnThreshold, long checkpointTermInSecs) { - this.recoveryMgr = recoveryMgr; + public CheckpointThread(ICheckpointManager checkpointManager, ILogManager logManager, long lsnThreshold, + long checkpointTermInSecs) { + this.checkpointManager = checkpointManager; this.logManager = logManager; this.lsnThreshold = lsnThreshold; this.checkpointTermInSecs = checkpointTermInSecs; + setDaemon(true); } @Override public void run() { - Thread.currentThread().setName("Checkpoint Thread"); - long currentCheckpointAttemptMinLSN; long lastCheckpointLSN = -1; long currentLogLSN; long targetCheckpointLSN; - while (true) { + while (shouldRun) { try { sleep(checkpointTermInSecs * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - //ignore } - + if (!shouldRun) { + return; + } if (lastCheckpointLSN == -1) { try { //Since the system just started up after sharp checkpoint, @@ -84,18 +88,20 @@ public class CheckpointThread extends Thread { //3. next time checkpoint comes, it will be able to remove log files which have end range less than current targetCheckpointLSN targetCheckpointLSN = lastCheckpointLSN + lsnThreshold; - currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN); + currentCheckpointAttemptMinLSN = checkpointManager.tryCheckpoint(targetCheckpointLSN); //checkpoint was completed at target LSN or above if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) { lastCheckpointLSN = currentCheckpointAttemptMinLSN; } - - } catch (ACIDException | HyracksDataException e) { - throw new IOError(e); + } catch (HyracksDataException e) { + LOGGER.log(Level.SEVERE, "Error during checkpoint", e); } } } } + public void shutdown() { + shouldRun = false; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java index 825e5c9..f8b6384 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java @@ -20,13 +20,9 @@ package org.apache.asterix.transaction.management.service.recovery; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; -import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -34,7 +30,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -48,15 +43,13 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.config.IPropertiesProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.replication.IReplicaResourcesManager; -import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; +import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ILogReader; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.IRecoveryManager; @@ -66,7 +59,6 @@ import org.apache.asterix.common.transactions.Resource; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; -import org.apache.asterix.transaction.management.service.transaction.TransactionManager; import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -89,30 +81,22 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName()); private final TransactionSubsystem txnSubsystem; private final LogManager logMgr; - private final int checkpointHistory; - private final long SHARP_CHECKPOINT_LSN = -1; private final boolean replicationEnabled; - public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1; private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp"; private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null; private final long cachedEntityCommitsPerJobSize; private final PersistentLocalResourceRepository localResourceRepository; - - /** - * A file at a known location that contains the LSN of the last log record - * traversed doing a successful checkpoint. - */ - private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_"; + private final ICheckpointManager checkpointManager; private SystemState state; public RecoveryManager(TransactionSubsystem txnSubsystem) { this.txnSubsystem = txnSubsystem; logMgr = (LogManager) txnSubsystem.getLogManager(); - checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory(); replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider() .getLocalResourceRepository(); cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize(); + checkpointManager = txnSubsystem.getCheckpointManager(); } /** @@ -126,10 +110,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { @Override public SystemState getSystemState() throws ACIDException { //read checkpoint file - CheckpointObject checkpointObject = null; - try { - checkpointObject = readCheckpoint(); - } catch (FileNotFoundException e) { + Checkpoint checkpointObject = checkpointManager.getLatest(); + if (checkpointObject == null) { //The checkpoint file doesn't exist => Failure happened during NC initialization. //Retry to initialize the NC by setting the state to NEW_UNIVERSE state = SystemState.NEW_UNIVERSE; @@ -140,7 +122,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } if (replicationEnabled) { - if (checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) { + if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { //no logs exist state = SystemState.HEALTHY; return state; @@ -156,14 +138,14 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } else { long readableSmallestLSN = logMgr.getReadableSmallestLSN(); if (logMgr.getAppendLSN() == readableSmallestLSN) { - if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) { + if (checkpointObject.getMinMCTFirstLsn() != AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { LOGGER.warning("Some(or all) of transaction log files are lost."); //No choice but continuing when the log files are lost. } state = SystemState.HEALTHY; return state; } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() - && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) { + && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { state = SystemState.HEALTHY; return state; } else { @@ -180,7 +162,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { LOGGER.log(Level.INFO, "starting recovery ..."); long readableSmallestLSN = logMgr.getReadableSmallestLSN(); - CheckpointObject checkpointObject = readCheckpoint(); + Checkpoint checkpointObject = checkpointManager.getLatest(); long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn(); if (lowWaterMarkLSN < readableSmallestLSN) { lowWaterMarkLSN = readableSmallestLSN; @@ -372,8 +354,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //#. get maxDiskLastLSN ILSMIndex lsmIndex = index; try { - maxDiskLastLsn = - ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) + maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex + .getIOOperationCallback()) .getComponentLSN(lsmIndex.getImmutableComponents()); } catch (HyracksDataException e) { datasetLifecycleManager.close(localResource.getPath()); @@ -422,129 +404,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } @Override - public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) - throws ACIDException, HyracksDataException { - long minMCTFirstLSN; - boolean nonSharpCheckpointSucceeded = false; - - if (isSharpCheckpoint) { - LOGGER.log(Level.INFO, "Starting sharp checkpoint ... "); - } - - TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager(); - String logDir = logMgr.getLogManagerProperties().getLogDir(); - - //get the filename of the previous checkpoint files which are about to be deleted - //right after the new checkpoint file is written. - File[] prevCheckpointFiles = getPreviousCheckpointFiles(); - - IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); - //flush all in-memory components if it is the sharp checkpoint - if (isSharpCheckpoint) { - datasetLifecycleManager.flushAllDatasets(); - if (!replicationEnabled) { - minMCTFirstLSN = SHARP_CHECKPOINT_LSN; - } else { - //if is shutting down, need to check if we need to keep any remote logs for dead replicas - if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown()) { - Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext() - .getReplicationManager().getDeadReplicasIds(); - if (deadReplicaIds.isEmpty()) { - minMCTFirstLSN = SHARP_CHECKPOINT_LSN; - } else { - //get min LSN of dead replicas remote resources - IReplicaResourcesManager remoteResourcesManager = txnSubsystem - .getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); - IPropertiesProvider propertiesProvider = (IPropertiesProvider) txnSubsystem - .getAsterixAppRuntimeContextProvider().getAppContext(); - MetadataProperties metadataProperties = propertiesProvider.getMetadataProperties(); - Set<Integer> deadReplicasPartitions = new HashSet<>(); - //get partitions of the dead replicas that are not active on this node - for (String deadReplicaId : deadReplicaIds) { - ClusterPartition[] nodePartitons = - metadataProperties.getNodePartitions().get(deadReplicaId); - for (ClusterPartition partition : nodePartitons) { - if (!localResourceRepository.getActivePartitions() - .contains(partition.getPartitionId())) { - deadReplicasPartitions.add(partition.getPartitionId()); - } - } - } - minMCTFirstLSN = remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions); - } - } else { - //start up complete checkpoint. Avoid deleting remote recovery logs. - minMCTFirstLSN = getMinFirstLSN(); - } - } - } else { - minMCTFirstLSN = getMinFirstLSN(); - if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) { - nonSharpCheckpointSucceeded = true; - } else { - //flush datasets with indexes behind target checkpoint LSN - datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN); - if (replicationEnabled) { - //request remote replicas to flush lagging indexes - IReplicationManager replicationManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager(); - try { - replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN, - txnMgr.getMaxJobId(), System.currentTimeMillis(), isSharpCheckpoint); - - String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp())); - - try (FileOutputStream fos = new FileOutputStream(fileName); - ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { - oosToFos.writeObject(checkpointObject); - oosToFos.flush(); - } catch (IOException e) { - throw new ACIDException("Failed to checkpoint", e); - } - - //#. delete the previous checkpoint files - if (prevCheckpointFiles != null) { - // sort the filenames lexicographically to keep the latest checkpointHistory files. - Arrays.sort(prevCheckpointFiles); - for (int i = 0; i < prevCheckpointFiles.length - this.checkpointHistory; ++i) { - prevCheckpointFiles[i].delete(); - } - } - - if (isSharpCheckpoint) { - try { - if (minMCTFirstLSN == SHARP_CHECKPOINT_LSN) { - logMgr.renewLogFiles(); - } else { - logMgr.deleteOldLogFiles(minMCTFirstLSN); - } - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - - if (nonSharpCheckpointSucceeded) { - logMgr.deleteOldLogFiles(minMCTFirstLSN); - } - - if (isSharpCheckpoint) { - LOGGER.info("Completed sharp checkpoint."); - } - - //return the min LSN that was recorded in the checkpoint - return minMCTFirstLSN; - } - - @Override public long getMinFirstLSN() throws HyracksDataException { long minFirstLSN = getLocalMinFirstLSN(); @@ -559,16 +418,16 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { @Override public long getLocalMinFirstLSN() throws HyracksDataException { - IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources(); long firstLSN; //the min first lsn can only be the current append or smaller long minFirstLSN = logMgr.getAppendLSN(); if (openIndexList.size() > 0) { for (IIndex index : openIndexList) { - AbstractLSMIOOperationCallback ioCallback = - (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback(); + AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index) + .getIOOperationCallback(); if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) { firstLSN = ioCallback.getFirstLSN(); @@ -580,60 +439,12 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } private long getRemoteMinFirstLSN() { - IReplicaResourcesManager remoteResourcesManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); + IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getAppContext().getReplicaResourcesManager(); long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions()); return minRemoteLSN; } - private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException { - CheckpointObject checkpointObject = null; - - //read all checkpointObjects from the existing checkpoint files - File[] prevCheckpointFiles = getPreviousCheckpointFiles(); - if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) { - throw new FileNotFoundException("Checkpoint file is not found"); - } - - List<CheckpointObject> checkpointObjectList = new ArrayList<>(); - for (File file : prevCheckpointFiles) { - try (FileInputStream fis = new FileInputStream(file); - ObjectInputStream oisFromFis = new ObjectInputStream(fis)) { - checkpointObject = (CheckpointObject) oisFromFis.readObject(); - checkpointObjectList.add(checkpointObject); - } catch (Exception e) { - throw new ACIDException("Failed to read a checkpoint file", e); - } - } - - //sort checkpointObjects in descending order by timeStamp to find out the most recent one. - Collections.sort(checkpointObjectList); - - //return the most recent one (the first one in sorted list) - return checkpointObjectList.get(0); - } - - private File[] getPreviousCheckpointFiles() { - String logDir = ((LogManager) txnSubsystem.getLogManager()).getLogManagerProperties().getLogDir(); - File parentDir = new File(logDir); - - FilenameFilter filter = new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.contains(CHECKPOINT_FILENAME_PREFIX); - } - }; - - return parentDir.listFiles(filter); - } - - private static String getCheckpointFileName(String baseDir, String suffix) { - if (!baseDir.endsWith(System.getProperty("file.separator"))) { - baseDir += System.getProperty("file.separator"); - } - return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix; - } - @Override public File createJobRecoveryFile(int jobId, String fileName) throws IOException { String recoveryDirPath = getRecoveryDirPath(); @@ -794,8 +605,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //undo loserTxn's effect LOGGER.log(Level.INFO, "undoing loser transaction's effect"); - IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); //TODO sort loser entities by smallest LSN to undo in one pass. Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); int undoCount = 0; @@ -836,12 +647,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { @Override public void stop(boolean dumpState, OutputStream os) throws IOException { - try { - checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN); - } catch (HyracksDataException | ACIDException e) { - e.printStackTrace(); - throw new IOException(e); - } + // Shutdown checkpoint + checkpointManager.doSharpCheckpoint(); } @Override @@ -851,10 +658,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { try { - ILSMIndex index = - (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId()); - ILSMIndexAccessor indexAccessor = - index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), + logRecord.getResourceId()); + ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, + NoOpOperationCallback.INSTANCE); if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { indexAccessor.forceDelete(logRecord.getNewValue()); } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { @@ -871,10 +678,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { try { int datasetId = logRecord.getDatasetId(); long resourceId = logRecord.getResourceId(); - ILSMIndex index = - (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId); - ILSMIndexAccessor indexAccessor = - index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId); + ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, + NoOpOperationCallback.INSTANCE); if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { indexAccessor.forceInsert(logRecord.getNewValue()); } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java new file mode 100644 index 0000000..6fdee33 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.service.recovery; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.transactions.CheckpointProperties; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * An implementation of {@link ICheckpointManager} that defines the logic + * of checkpoints when replication is enabled.. + */ +public class ReplicationCheckpointManager extends AbstractCheckpointManager { + + private static final Logger LOGGER = Logger.getLogger(ReplicationCheckpointManager.class.getName()); + + public ReplicationCheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { + super(txnSubsystem, checkpointProperties); + } + + /** + * Performs a sharp checkpoint. All datasets are flushed and all transaction + * log files are deleted except the files that are needed for dead replicas. + */ + @Override + public synchronized void doSharpCheckpoint() throws HyracksDataException { + LOGGER.info("Starting sharp checkpoint..."); + final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); + datasetLifecycleManager.flushAllDatasets(); + long minFirstLSN; + // If shutting down, need to check if we need to keep any remote logs for dead replicas + if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown()) { + final Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext() + .getReplicationManager().getDeadReplicasIds(); + if (deadReplicaIds.isEmpty()) { + // No dead replicas => no need to keep any log + minFirstLSN = SHARP_CHECKPOINT_LSN; + } else { + // Get min LSN of dead replicas remote resources + minFirstLSN = getDeadReplicasMinFirstLSN(deadReplicaIds); + } + } else { + // Start up complete checkpoint. Avoid deleting remote recovery logs. + minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); + } + capture(minFirstLSN, true); + if (minFirstLSN == SHARP_CHECKPOINT_LSN) { + // No need to keep any logs + txnSubsystem.getLogManager().renewLogFiles(); + } else { + // Delete only log files with LSNs < any dead replica partition minimum LSN + txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN); + } + LOGGER.info("Completed sharp checkpoint."); + } + + /*** + * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}. + * If a checkpoint cannot be captured due to datasets having LSN < {@code checkpointTargetLSN}, + * an asynchronous flush is triggered on them. If the checkpoint fails due to a replica index, + * a request is sent to the primary replica of the index to flush it. + * When a checkpoint is successful, all transaction log files that end with + * LSN < {@code checkpointTargetLSN} are deleted. + */ + @Override + public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException { + LOGGER.info("Attemping soft checkpoint..."); + final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); + boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; + if (!checkpointSucceeded) { + // Flush datasets with indexes behind target checkpoint LSN + final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); + datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN); + // Request remote replicas to flush lagging indexes + final IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getAppContext().getReplicationManager(); + try { + replicationManager.requestFlushLaggingReplicaIndexes(checkpointTargetLSN); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } + capture(minFirstLSN, false); + if (checkpointSucceeded) { + txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN); + LOGGER.info(String.format("soft checkpoint succeeded with at LSN(%s)", minFirstLSN)); + } + return minFirstLSN; + } + + private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) { + final IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getAppContext().getReplicaResourcesManager(); + final IPropertiesProvider propertiesProvider = (IPropertiesProvider) txnSubsystem + .getAsterixAppRuntimeContextProvider().getAppContext(); + final MetadataProperties metadataProperties = propertiesProvider.getMetadataProperties(); + final PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) txnSubsystem + .getAsterixAppRuntimeContextProvider().getLocalResourceRepository(); + // Get partitions of the dead replicas that are not active on this node + final Set<Integer> deadReplicasPartitions = new HashSet<>(); + for (String deadReplicaId : deadReplicaIds) { + final ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions().get(deadReplicaId); + for (ClusterPartition partition : nodePartitons) { + if (!localResourceRepository.getActivePartitions().contains(partition.getPartitionId())) { + deadReplicasPartitions.add(partition.getPartitionId()); + } + } + } + return remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java index f035029..b08ecbb 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java @@ -145,6 +145,7 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon } } + @Override public int getMaxJobId() { return maxJobId.get(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aa00bf69/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java index ce1752a..09183fe 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java @@ -22,21 +22,25 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.logging.Logger; -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.Checkpoint; +import org.apache.asterix.common.transactions.CheckpointProperties; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; +import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ILockManager; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication; -import org.apache.asterix.transaction.management.service.recovery.CheckpointThread; +import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory; import org.apache.asterix.transaction.management.service.recovery.RecoveryManager; /** @@ -50,8 +54,8 @@ public class TransactionSubsystem implements ITransactionSubsystem { private final ITransactionManager transactionManager; private final IRecoveryManager recoveryManager; private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider; - private final CheckpointThread checkpointThread; private final TransactionProperties txnProperties; + private final ICheckpointManager checkpointManager; //for profiling purpose public static final boolean IS_PROFILE_MODE = false;//true @@ -66,6 +70,15 @@ public class TransactionSubsystem implements ITransactionSubsystem { this.txnProperties = txnProperties; this.transactionManager = new TransactionManager(this); this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer()); + final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); + final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id); + checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled); + final Checkpoint latestCheckpoint = checkpointManager.getLatest(); + if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { + throw new IllegalStateException( + String.format("Storage version mismatch. Current version (%s). On disk version: (%s)", + latestCheckpoint.getStorageVersion(), StorageConstants.VERSION)); + } ReplicationProperties asterixReplicationProperties = null; if (asterixAppRuntimeContextProvider != null) { @@ -73,22 +86,13 @@ public class TransactionSubsystem implements ITransactionSubsystem { .getAppContext()).getReplicationProperties(); } - if (asterixReplicationProperties != null && ClusterProperties.INSTANCE.isReplicationEnabled()) { + if (asterixReplicationProperties != null && replicationEnabled) { this.logManager = new LogManagerWithReplication(this); } else { this.logManager = new LogManager(this); } - this.recoveryManager = new RecoveryManager(this); - if (asterixAppRuntimeContextProvider != null) { - this.checkpointThread = new CheckpointThread(recoveryManager, logManager, - this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency()); - this.checkpointThread.start(); - } else { - this.checkpointThread = null; - } - if (IS_PROFILE_MODE) { ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval()); fecp = (Future<Object>) getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp); @@ -133,6 +137,11 @@ public class TransactionSubsystem implements ITransactionSubsystem { ++profilerEntityCommitLogCount; } + @Override + public ICheckpointManager getCheckpointManager() { + return checkpointManager; + } + /** * Thread for profiling entity level commit count * This thread takes a report interval (in seconds) parameter and