Ian Maxon has submitted this change and it was merged. Change subject: [ASTERIXDB-1708][TX] Prevent log deletion during scan ......................................................................
[ASTERIXDB-1708][TX] Prevent log deletion during scan Right now there is a potential for a soft checkpoint to delete a log file that is about to be read as part of a transaction rollback. This patch stops the soft checkpoint from proceeding if a rollback is about to take place and vice-versa. Change-Id: Icff1a520af24c8fac8e5836cdbf46425b78b1260 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2508 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java 6 files changed, 174 insertions(+), 74 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 4a2cf2d..4b14a9c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -59,6 +59,7 @@ import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogManager; @@ -104,6 +105,7 @@ private SystemState state; private final INCServiceContext serviceCtx; private final INcApplicationContext appCtx; + private static final TxnId recoveryTxnId = new TxnId(-1); public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) { this.serviceCtx = serviceCtx; @@ -505,21 +507,24 @@ } @Override - public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException { - long minLSN = getPartitionsMinLSN(partitions); - long readableSmallestLSN = logMgr.getReadableSmallestLSN(); - if (minLSN < readableSmallestLSN) { - minLSN = readableSmallestLSN; - } - + public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) + throws HyracksDataException { //replay logs > minLSN that belong to these partitions try { + checkpointManager.secure(recoveryTxnId); + long minLSN = getPartitionsMinLSN(partitions); + long readableSmallestLSN = logMgr.getReadableSmallestLSN(); + if (minLSN < readableSmallestLSN) { + minLSN = readableSmallestLSN; + } replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN); if (flush) { appCtx.getDatasetLifecycleManager().flushAllDatasets(); } } catch (IOException | ACIDException e) { throw HyracksDataException.create(e); + } finally { + checkpointManager.completed(recoveryTxnId); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index 91d98e5..418282e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -28,16 +28,19 @@ import org.apache.asterix.app.bootstrap.TestNodeController; import org.apache.asterix.app.data.gen.TupleGenerator; import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.app.nc.RecoveryManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.ICheckpointManager; -import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.TransactionOptions; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.asterix.common.utils.TransactionUtil; import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -50,20 +53,23 @@ import org.apache.asterix.test.common.TestHelper; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.asterix.transaction.management.service.transaction.TransactionManager; import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; -import org.apache.hyracks.util.StorageUtil; -import org.apache.hyracks.util.StorageUtil.StorageUnit; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.stubbing.Answer; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; public class CheckpointingTest { @@ -88,6 +94,8 @@ private static final String DATASET_NAME = "TestDS"; private static final String DATA_TYPE_NAME = "DUMMY"; private static final String NODE_GROUP_NAME = "DEFAULT"; + private volatile boolean threadException = false; + private Throwable exception = null; @Before public void setUp() throws Exception { @@ -128,7 +136,7 @@ VSizeFrame frame = new VSizeFrame(ctx); FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); - IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager(); + RecoveryManager recoveryManager = (RecoveryManager) nc.getTransactionSubsystem().getRecoveryManager(); ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager(); LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager(); // Number of log files after node startup should be one @@ -178,20 +186,70 @@ /* * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so - * a checkpoint should delete it. + * a checkpoint should delete it. We will also start a second + * job to ensure that the checkpointing coexists peacefully + * with other concurrent readers of the log that request + * deletions to be witheld */ - checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN()); - // Validate initialLowWaterMarkFileId was deleted - for (Long fileId : logManager.getLogFileIds()) { - Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue()); - } + JobId jobId2 = nc.newJobId(); + IHyracksTaskContext ctx2 = nc.createTestContext(jobId2, 0, false); + nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2), + new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); + // Prepare insert operation + LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES, + RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); + insertOp2.open(); + VSizeFrame frame2 = new VSizeFrame(ctx2); + FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2); + for (int i = 0; i < 4; i++) { + long lastCkpoint = recoveryManager.getMinFirstLSN(); + long lastFileId = logManager.getLogFileId(lastCkpoint); - if (tupleAppender.getTupleCount() > 0) { - tupleAppender.write(insertOp, true); + checkpointManager.tryCheckpoint(lowWaterMarkLSN); + // Validate initialLowWaterMarkFileId was deleted + for (Long fileId : logManager.getLogFileIds()) { + Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue()); + } + + while (currentLowWaterMarkLogFileId == lastFileId) { + ITupleReference tuple = tupleGenerator.next(); + DataflowUtils.addTupleToFrame(tupleAppender2, tuple, insertOp2); + lowWaterMarkLSN = recoveryManager.getMinFirstLSN(); + currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN); + } } - insertOp.close(); - nc.getTransactionManager().commitTransaction(txnCtx.getTxnId()); + Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + threadException = true; + exception = ex; + } + }; + + Thread t = new Thread(() -> { + TransactionManager spyTxnMgr = spy((TransactionManager) nc.getTransactionManager()); + doAnswer((Answer) i -> { + stallAbortTxn(Thread.currentThread(), txnCtx, nc.getTransactionSubsystem(), + (TxnId) i.getArguments()[0]); + return null; + }).when(spyTxnMgr).abortTransaction(any(TxnId.class)); + + spyTxnMgr.abortTransaction(txnCtx.getTxnId()); + }); + t.setUncaughtExceptionHandler(h); + synchronized (t) { + t.start(); + t.wait(); + } + long lockedLSN = recoveryManager.getMinFirstLSN(); + checkpointManager.tryCheckpoint(lockedLSN); + synchronized (t) { + t.notifyAll(); + } + t.join(); + if (threadException) { + throw exception; + } } finally { nc.deInit(); } @@ -201,6 +259,32 @@ } } + private void stallAbortTxn(Thread t, ITransactionContext txnCtx, ITransactionSubsystem txnSubsystem, TxnId txnId) + throws InterruptedException, HyracksDataException { + + try { + if (txnCtx.isWriteTxn()) { + LogRecord logRecord = new LogRecord(); + TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false); + txnSubsystem.getLogManager().log(logRecord); + txnSubsystem.getCheckpointManager().secure(txnId); + synchronized (t) { + t.notifyAll(); + t.wait(); + } + txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx); + txnCtx.setTxnState(ITransactionManager.ABORTED); + } + } catch (ACIDException | HyracksDataException e) { + String msg = "Could not complete rollback! System is in an inconsistent state"; + throw new ACIDException(msg, e); + } finally { + txnCtx.complete(); + txnSubsystem.getLockManager().releaseLocks(txnCtx); + txnSubsystem.getCheckpointManager().completed(txnId); + } + } + @Test public void testCorruptedCheckpointFiles() { try { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java index 9e7eb0d..e3cf8b8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java @@ -46,4 +46,19 @@ * @throws HyracksDataException */ long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException; -} \ No newline at end of file + + /** + * Secures the current low-water mark until the transaction identified by {@code id} completes. + * + * @param id + * @throws HyracksDataException + */ + void secure(TxnId id) throws HyracksDataException; + + /** + * Notifies this {@link ICheckpointManager} that the transaction identified by {@code id} completed. + * + * @param id + */ + void completed(TxnId id); +} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 3ada608..736de07 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -76,7 +76,6 @@ private final String logFilePrefix; private final MutableLong flushLSN; private final String nodeId; - private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>(); private final long logFileSize; private final int logPageSize; private final AtomicLong appendLSN; @@ -407,24 +406,20 @@ /** * At this point, any future LogReader should read from LSN >= checkpointLSN */ - synchronized (txnLogFileId2ReaderCount) { - for (Long id : logFileIds) { - /** - * Stop deletion if: - * The log file which contains the checkpointLSN has been reached. - * The oldest log file being accessed by a LogReader has been reached. - */ - if (id >= checkpointLSNLogFileID - || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) { - break; - } - //delete old log file - File file = new File(getLogFilePath(id)); - file.delete(); - txnLogFileId2ReaderCount.remove(id); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Deleted log file " + file.getAbsolutePath()); - } + for (Long id : logFileIds) { + /** + * Stop deletion if: + * The log file which contains the checkpointLSN has been reached. + * The oldest log file being accessed by a LogReader has been reached. + */ + if (id >= checkpointLSNLogFileID) { + break; + } + //delete old log file + File file = new File(getLogFilePath(id)); + file.delete(); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Deleted log file " + file.getAbsolutePath()); } } } @@ -450,7 +445,6 @@ } private long deleteAllLogFiles() { - txnLogFileId2ReaderCount.clear(); List<Long> logFileIds = getLogFileIds(); if (!logFileIds.isEmpty()) { for (Long id : logFileIds) { @@ -607,7 +601,6 @@ RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r"); FileChannel newFileChannel = raf.getChannel(); TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize); - touchLogFile(fileId); return logFile; } @@ -617,32 +610,6 @@ LOGGER.warn(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel."); } fileChannel.close(); - untouchLogFile(logFileRef.getLogFileId()); - } - - private void touchLogFile(long fileId) { - synchronized (txnLogFileId2ReaderCount) { - if (txnLogFileId2ReaderCount.containsKey(fileId)) { - txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1); - } else { - txnLogFileId2ReaderCount.put(fileId, 1); - } - } - } - - private void untouchLogFile(long fileId) { - synchronized (txnLogFileId2ReaderCount) { - if (txnLogFileId2ReaderCount.containsKey(fileId)) { - int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1; - if (newReaderCount < 0) { - throw new IllegalStateException( - "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")"); - } - txnLogFileId2ReaderCount.put(fileId, newReaderCount); - } else { - throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened."); - } - } } /** diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index a541bd9..6efd0e5 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -22,9 +22,14 @@ import org.apache.asterix.common.transactions.CheckpointProperties; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** * An implementation of {@link ICheckpointManager} that defines the logic @@ -33,9 +38,12 @@ public class CheckpointManager extends AbstractCheckpointManager { private static final Logger LOGGER = LogManager.getLogger(); + private static final long NO_SECURED_LSN = -1l; + private final Map<TxnId, Long> securedLSNs; public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) { super(txnSubsystem, checkpointProperties); + securedLSNs = new HashMap<>(); } /** @@ -62,6 +70,10 @@ @Override public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException { LOGGER.info("Attemping soft checkpoint..."); + final long minSecuredLSN = getMinSecuredLSN(); + if (minSecuredLSN != NO_SECURED_LSN && checkpointTargetLSN >= minSecuredLSN) { + return minSecuredLSN; + } final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN(); boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; if (!checkpointSucceeded) { @@ -77,4 +89,18 @@ } return minFirstLSN; } -} \ No newline at end of file + + @Override + public synchronized void secure(TxnId id) throws HyracksDataException { + securedLSNs.put(id, txnSubsystem.getRecoveryManager().getMinFirstLSN()); + } + + @Override + public synchronized void completed(TxnId id) { + securedLSNs.remove(id); + } + + private synchronized long getMinSecuredLSN() { + return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values()); + } +} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java index 76ecc63..c218dec 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java @@ -33,6 +33,7 @@ import org.apache.asterix.common.transactions.TransactionOptions; import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.TransactionUtil; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.util.annotations.ThreadSafe; import org.apache.logging.log4j.Level; @@ -103,10 +104,11 @@ LogRecord logRecord = new LogRecord(); TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false); txnSubsystem.getLogManager().log(logRecord); + txnSubsystem.getCheckpointManager().secure(txnId); txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx); txnCtx.setTxnState(ITransactionManager.ABORTED); } - } catch (ACIDException e) { + } catch (HyracksDataException e) { String msg = "Could not complete rollback! System is in an inconsistent state"; if (LOGGER.isErrorEnabled()) { LOGGER.log(Level.ERROR, msg, e); @@ -116,6 +118,7 @@ txnCtx.complete(); txnSubsystem.getLockManager().releaseLocks(txnCtx); txnCtxRepository.remove(txnCtx.getTxnId()); + txnSubsystem.getCheckpointManager().completed(txnId); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2508 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Icff1a520af24c8fac8e5836cdbf46425b78b1260 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: release-0.9.4-pre-rc Gerrit-Owner: Ian Maxon <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
