Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-1464][TX] Handle Interrupts in LogManager ......................................................................
[ASTERIXDB-1464][TX] Handle Interrupts in LogManager - user model changes: no - storage format changes: no - interface changes: no Details: - Properly handle interrupts on log page and log file switch in LogManager. - Propagate interrupts on transactor threads. - Ignore interrupts while waiting for txn commit/abort log to be flushed. - Add test case for handling interrupts in LogManager. - Add test case for interrupting a txn waiting for commit log. Change-Id: I6936a60dc572e07f01baabc3f8af3bf88a58f661 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2248 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java M asterixdb/asterix-app/src/test/resources/log4j2-test.xml M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java 9 files changed, 470 insertions(+), 175 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; ; Verified Michael Blow: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index de0c88f..8552a1c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -3059,6 +3059,7 @@ * @param mdTxnCtx */ public static void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) { + boolean interrupted = Thread.interrupted(); try { if (IS_DEBUG_MODE) { LOGGER.log(Level.ERROR, rootE.getMessage(), rootE); @@ -3069,6 +3070,10 @@ } catch (Exception e2) { parentE.addSuppressed(e2); throw new IllegalStateException(rootE); + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index f6a4aac..1a14864 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -32,6 +32,7 @@ import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; @@ -173,9 +174,16 @@ try { work.run(); done = true; - } catch (InterruptedException e) { - LOGGER.log(Level.WARN, "Retry with attempt " + (++retryCount), e); - interruptedException = e; + } catch (Exception e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof java.lang.InterruptedException) { + interruptedException = (InterruptedException) rootCause; + // clear the interrupted state from the thread + Thread.interrupted(); + LOGGER.log(Level.WARN, "Retry with attempt " + (++retryCount), e); + continue; + } + throw e; } } while (!done); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java index 6cf1940..128aee6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java @@ -19,6 +19,7 @@ package org.apache.asterix.common; import java.io.InputStream; +import java.rmi.RemoteException; import java.util.Arrays; import java.util.LinkedHashSet; import java.util.Map; @@ -28,11 +29,18 @@ import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.utils.Servlets; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback; import org.apache.asterix.test.common.TestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; import org.apache.asterix.utils.RebalanceUtil; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.io.FileSplit; import org.junit.Assert; import com.fasterxml.jackson.databind.JsonNode; @@ -158,4 +166,52 @@ metadataProvider.getLocks().unlock(); } } + + /** + * Gets the reference of dataset {@code dataset} from metadata + * + * @param integrationUtil + * @param datasetName + * @return the dataset reference if found. Otherwise null. + * @throws AlgebricksException + * @throws RemoteException + */ + public static Dataset getDataset(AsterixHyracksIntegrationUtil integrationUtil, String datasetName) + throws AlgebricksException, RemoteException { + final ICcApplicationContext appCtx = + (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext(); + final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null); + final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + Dataset dataset; + try { + dataset = metadataProvider.findDataset(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME, datasetName); + } finally { + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + metadataProvider.getLocks().unlock(); + } + return dataset; + } + + /** + * Gets the file splits of {@code dataset} + * + * @param integrationUtil + * @param dataset + * @return the file splits of the dataset + * @throws RemoteException + * @throws AlgebricksException + */ + public static FileSplit[] getDatasetSplits(AsterixHyracksIntegrationUtil integrationUtil, Dataset dataset) + throws RemoteException, AlgebricksException { + final ICcApplicationContext ccAppCtx = + (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext(); + final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + try { + return SplitsAndConstraintsUtil + .getIndexSplits(dataset, dataset.getDatasetName(), mdTxnCtx, ccAppCtx.getClusterStateManager()); + } finally { + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java index 1d31bc0..decee99 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java @@ -22,9 +22,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.ConsoleHandler; -import java.util.logging.Level; -import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -213,6 +210,38 @@ } } + @Test + public void surviveInterruptOnMetadataTxnCommit() throws Exception { + ICcApplicationContext appCtx = + (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext(); + final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null); + final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxn); + final String nodeGroupName = "ng"; + Thread transactor = new Thread(() -> { + final List<String> ngNodes = Arrays.asList("asterix_nc1"); + try { + MetadataManager.INSTANCE.addNodegroup(mdTxn, new NodeGroup(nodeGroupName, ngNodes)); + Thread.currentThread().interrupt(); + MetadataManager.INSTANCE.commitTransaction(mdTxn); + } catch (Exception e) { + e.printStackTrace(); + } + }); + transactor.start(); + transactor.join(); + // ensure that the node group was added + final MetadataTransactionContext readMdTxn = MetadataManager.INSTANCE.beginTransaction(); + try { + final NodeGroup nodegroup = MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName); + if (nodegroup == null) { + throw new AssertionError("nodegroup was found after metadata txn was aborted"); + } + } finally { + MetadataManager.INSTANCE.commitTransaction(readMdTxn); + } + } + private void addDataset(ICcApplicationContext appCtx, Dataset source, int datasetPostfix, boolean abort) throws Exception { Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + datasetPostfix, source.getDataverseName(), diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java new file mode 100644 index 0000000..f43f3ff --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.txn; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.common.TestDataUtil; +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.transactions.DatasetId; +import org.apache.asterix.common.transactions.ILockManager; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.transactions.TransactionOptions; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.asterix.common.utils.TransactionUtil; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.test.common.TestTupleReference; +import org.apache.asterix.transaction.management.service.logging.LogManager; +import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class LogManagerTest { + + protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + private static final String PREPARE_NEXT_LOG_FILE_METHOD = "prepareNextLogFile"; + + @Before + public void setUp() throws Exception { + System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME); + integrationUtil.init(true, TEST_CONFIG_FILE_NAME); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + @Test + public void interruptedLogPageSwitch() throws Exception { + final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext(); + final String nodeId = ncAppCtx.getServiceContext().getNodeId(); + + final String datasetName = "ds"; + TestDataUtil.createIdOnlyDataset(datasetName); + final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName); + final String indexPath = getIndexPath(dataset, nodeId); + final IDatasetLifecycleManager dclm = ncAppCtx.getDatasetLifecycleManager(); + dclm.open(indexPath); + final ILSMIndex index = (ILSMIndex) dclm.get(indexPath); + final long resourceId = ncAppCtx.getLocalResourceRepository().get(indexPath).getId(); + final DatasetLocalResource datasetLocalResource = + (DatasetLocalResource) ncAppCtx.getLocalResourceRepository().get(indexPath).getResource(); + final ITransactionContext txnCtx = beingTransaction(ncAppCtx, index, resourceId); + final ILogManager logManager = ncAppCtx.getTransactionSubsystem().getLogManager(); + final ILockManager lockManager = ncAppCtx.getTransactionSubsystem().getLockManager(); + final DatasetId datasetId = new DatasetId(dataset.getDatasetId()); + final int[] pkFields = dataset.getPrimaryBloomFilterFields(); + final int fieldsLength = pkFields.length; + final TestTupleReference tuple = new TestTupleReference(fieldsLength); + tuple.getFields()[0].getDataOutput().write(1); + final int partition = datasetLocalResource.getPartition(); + + // ensure interrupted thread will be interrupted on allocating next log page + final AtomicBoolean interrupted = new AtomicBoolean(false); + Thread interruptedTransactor = new Thread(() -> { + Thread.currentThread().interrupt(); + try { + for (int i = 0; i < 10000; i++) { + lockManager.lock(datasetId, i, LockManagerConstants.LockMode.S, txnCtx); + LogRecord logRecord = new LogRecord(); + TransactionUtil.formEntityCommitLogRecord(logRecord, txnCtx, datasetId.getId(), i, tuple, pkFields, + partition, LogType.ENTITY_COMMIT); + logManager.log(logRecord); + } + } catch (ACIDException e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof java.lang.InterruptedException) { + interrupted.set(true); + } + } + }); + interruptedTransactor.start(); + interruptedTransactor.join(); + Assert.assertTrue(interrupted.get()); + + // ensure next thread will be able to allocate next page + final AtomicInteger failCount = new AtomicInteger(0); + Thread transactor = new Thread(() -> { + try { + for (int i = 0; i < 10000; i++) { + lockManager.lock(datasetId, i, LockManagerConstants.LockMode.S, txnCtx); + LogRecord logRecord = new LogRecord(); + TransactionUtil.formEntityCommitLogRecord(logRecord, txnCtx, datasetId.getId(), i, tuple, pkFields, + partition, LogType.ENTITY_COMMIT); + logManager.log(logRecord); + } + } catch (Exception e) { + failCount.incrementAndGet(); + } + }); + transactor.start(); + transactor.join(); + Assert.assertEquals(0, failCount.get()); + } + + @Test + public void interruptedLogFileSwitch() throws Exception { + final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext(); + final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager(); + int logFileCountBeforeInterrupt = logManager.getLogFileIds().size(); + + // ensure an interrupted transactor will create next log file but will fail to position the log channel + final AtomicBoolean interrupted = new AtomicBoolean(false); + Thread interruptedTransactor = new Thread(() -> { + Thread.currentThread().interrupt(); + try { + prepareNextLogFile(logManager); + } catch (Exception e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause.getCause() instanceof java.nio.channels.ClosedByInterruptException) { + interrupted.set(true); + } + } + }); + interruptedTransactor.start(); + interruptedTransactor.join(); + // ensure a new log file was created but the thread was interrupt + int logFileCountAfterInterrupt = logManager.getLogFileIds().size(); + Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt); + Assert.assertTrue(interrupted.get()); + + // ensure next transactor will not create another file + final AtomicBoolean failed = new AtomicBoolean(false); + Thread transactor = new Thread(() -> { + try { + prepareNextLogFile(logManager); + } catch (Exception e) { + failed.set(true); + } + }); + transactor.start(); + transactor.join(); + // make sure no new files were created and the operation was successful + int countAfterTransactor = logManager.getLogFileIds().size(); + Assert.assertEquals(logFileCountAfterInterrupt, countAfterTransactor); + Assert.assertFalse(failed.get()); + + // make sure we can still log to the new file + interruptedLogPageSwitch(); + } + + private static String getIndexPath(Dataset dataset, String nodeId) throws Exception { + final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset); + final Optional<FileSplit> nodeFileSplit = + Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst(); + Assert.assertTrue(nodeFileSplit.isPresent()); + return nodeFileSplit.get().getPath(); + } + + private static ITransactionContext beingTransaction(INcApplicationContext ncAppCtx, ILSMIndex index, + long resourceId) { + final TxnId txnId = new TxnId(1); + final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL); + final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager(); + final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options); + txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true); + return txnCtx; + } + + private static void prepareNextLogFile(LogManager logManager) throws Exception { + Method method; + try { + method = LogManager.class.getDeclaredMethod(PREPARE_NEXT_LOG_FILE_METHOD, null); + } catch (Exception e) { + throw new IllegalStateException( + "Couldn't find " + PREPARE_NEXT_LOG_FILE_METHOD + " in LogManager. Was it renamed?"); + } + method.setAccessible(true); + method.invoke(logManager, null); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml index d17fad7..08eee10 100644 --- a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml +++ b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml @@ -27,5 +27,11 @@ <Logger name="org.apache.asterix.test" level="WARN"> <AppenderRef ref="Console"/> </Logger> + <Logger name="org.apache.asterix.transaction.management.service.logging.LogFlusher" level="INFO"> + <AppenderRef ref="Console"/> + </Logger> + <Logger name="org.apache.asterix.utils.RebalanceUtil" level="INFO"> + <AppenderRef ref="Console"/> + </Logger> </Loggers> </Configuration> \ No newline at end of file diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index dcf8250..b67da80 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -125,14 +125,6 @@ this.fileChannel = fileChannel; } - public void setInitialFlushOffset(long offset) { - try { - fileChannel.position(offset); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - @Override public synchronized void setFull() { this.full.set(true); @@ -146,7 +138,7 @@ @Override public boolean hasSpace(int logSize) { - return appendOffset + logSize <= logPageSize; + return appendOffset + logSize <= logPageSize && !full.get(); } @Override diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 4ce9c71..833f8f6 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -24,6 +24,9 @@ import java.io.OutputStream; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -46,6 +49,7 @@ import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogManagerProperties; +import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.MutableLong; import org.apache.asterix.common.transactions.TxnLogFile; @@ -75,9 +79,9 @@ private final String nodeId; private final FlushLogsLogger flushLogsLogger; private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>(); - protected final long logFileSize; - protected final int logPageSize; - protected final AtomicLong appendLSN; + private final long logFileSize; + private final int logPageSize; + private final AtomicLong appendLSN; /* * Mutables */ @@ -85,7 +89,7 @@ private LinkedBlockingQueue<ILogBuffer> flushQ; private LinkedBlockingQueue<ILogBuffer> stashQ; private FileChannel appendChannel; - protected ILogBuffer appendPage; + private ILogBuffer appendPage; private LogFlusher logFlusher; private Future<? extends Object> futureLogFlusher; protected LinkedBlockingQueue<ILogRecord> flushLogsQ; @@ -112,15 +116,19 @@ flushQ = new LinkedBlockingQueue<>(numLogPages); stashQ = new LinkedBlockingQueue<>(numLogPages); for (int i = 0; i < numLogPages; i++) { - emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN)); + emptyQ.add(new LogBuffer(txnSubsystem, logPageSize, flushLSN)); } appendLSN.set(initializeLogAnchor(nextLogFileId)); flushLSN.set(appendLSN.get()); if (LOGGER.isInfoEnabled()) { LOGGER.info("LogManager starts logging in LSN: " + appendLSN); } - appendChannel = getFileChannel(appendLSN.get(), false); - getAndInitNewPage(INITIAL_LOG_SIZE); + try { + setLogPosition(appendLSN.get()); + } catch (IOException e) { + throw new ACIDException(e); + } + initNewPage(INITIAL_LOG_SIZE); logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ); futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher); if (!flushLogsLogger.isAlive()) { @@ -129,55 +137,43 @@ } @Override - public void log(ILogRecord logRecord) throws ACIDException { + public void log(ILogRecord logRecord) { if (logRecord.getLogType() == LogType.FLUSH) { - flushLogsQ.offer(logRecord); + flushLogsQ.add(logRecord); return; } appendToLogTail(logRecord); } - protected void appendToLogTail(ILogRecord logRecord) throws ACIDException { + protected void appendToLogTail(ILogRecord logRecord) { syncAppendToLogTail(logRecord); - - if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT - || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) { - synchronized (logRecord) { - while (!logRecord.isFlushed()) { - try { + if (waitForFlush(logRecord) && !logRecord.isFlushed()) { + InvokeUtil.doUninterruptibly(() -> { + synchronized (logRecord) { + while (!logRecord.isFlushed()) { logRecord.wait(); - } catch (InterruptedException e) { - //ignore } } - } + }); } } - protected synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException { - if (logRecord.getLogType() != LogType.FLUSH) { + protected static boolean waitForFlush(ILogRecord logRecord) { + final byte logType = logRecord.getLogType(); + return logType == LogType.JOB_COMMIT || logType == LogType.ABORT || logType == LogType.WAIT; + } + + synchronized void syncAppendToLogTail(ILogRecord logRecord) { + if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) { ITransactionContext txnCtx = logRecord.getTxnCtx(); if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) { throw new ACIDException( "Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record."); } } - - /* - * To eliminate the case where the modulo of the next appendLSN = 0 (the next - * appendLSN = the first LSN of the next log file), we do not allow a log to be - * written at the last offset of the current file. - */ final int logSize = logRecord.getLogSize(); - // Make sure the log will not exceed the log file size - if (getLogFileOffset(appendLSN.get()) + logSize >= logFileSize) { - prepareNextLogFile(); - prepareNextPage(logSize); - } else if (!appendPage.hasSpace(logSize)) { - prepareNextPage(logSize); - } + ensureSpace(logSize); appendPage.append(logRecord, appendLSN.get()); - if (logRecord.getLogType() == LogType.FLUSH) { logRecord.setLSN(appendLSN.get()); } @@ -187,70 +183,98 @@ appendLSN.addAndGet(logSize); } - protected void prepareNextPage(int logSize) { - appendPage.setFull(); - getAndInitNewPage(logSize); - } - - protected void getAndInitNewPage(int logSize) { - if (logSize > logPageSize) { - // before creating a new page, we need to stash a normal sized page since our queues have fixed capacity - appendPage = null; - while (appendPage == null) { - try { - appendPage = emptyQ.take(); - stashQ.add(appendPage); - } catch (InterruptedException e) { - //ignore - } - } - // for now, alloc a new buffer for each large page - // TODO: pool large pages?? - appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN); - appendPage.setFileChannel(appendChannel); - flushQ.offer(appendPage); - } else { - appendPage = null; - while (appendPage == null) { - try { - appendPage = emptyQ.take(); - } catch (InterruptedException e) { - //ignore - } - } - appendPage.reset(); - appendPage.setFileChannel(appendChannel); - flushQ.offer(appendPage); + private void ensureSpace(int logSize) { + if (!fileHasSpace(logSize)) { + ensureLastPageFlushed(); + prepareNextLogFile(); + } + if (!appendPage.hasSpace(logSize)) { + prepareNextPage(logSize); } } - protected void prepareNextLogFile() { + private boolean fileHasSpace(int logSize) { + /* + * To eliminate the case where the modulo of the next appendLSN = 0 (the next + * appendLSN = the first LSN of the next log file), we do not allow a log to be + * written at the last offset of the current file. + */ + return getLogFileOffset(appendLSN.get()) + logSize < logFileSize; + } + + private void prepareNextPage(int logSize) { + appendPage.setFull(); + initNewPage(logSize); + } + + private void initNewPage(int logSize) { + boolean largePage = logSize > logPageSize; + // if a new large page will be allocated, we need to stash a normal sized page + // since our queues have fixed capacity + ensureAvailablePage(largePage); + if (largePage) { + // for now, alloc a new buffer for each large page + // TODO: pool large pages?? + appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN); + } else { + appendPage.reset(); + } + appendPage.setFileChannel(appendChannel); + flushQ.add(appendPage); + } + + private void ensureAvailablePage(boolean stash) { + final ILogBuffer currentPage = appendPage; + appendPage = null; + try { + appendPage = emptyQ.take(); + if (stash) { + stashQ.add(appendPage); + } + } catch (InterruptedException e) { + appendPage = currentPage; + Thread.currentThread().interrupt(); + throw new ACIDException(e); + } + } + + private void prepareNextLogFile() { + final long nextFileBeginLsn = getNextFileFirstLsn(); + try { + createNextLogFile(); + setLogPosition(nextFileBeginLsn); + } catch (IOException e) { + throw new ACIDException(e); + } + // move appendLSN and flushLSN to the first LSN of the next log file + // only after the file was created and the channel was positioned successfully + appendLSN.set(nextFileBeginLsn); + flushLSN.set(nextFileBeginLsn); + LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", getLogFileId(nextFileBeginLsn), + nextFileBeginLsn); + } + + private long getNextFileFirstLsn() { + // add the remaining space in the current file + return appendLSN.get() + (logFileSize - getLogFileOffset(appendLSN.get())); + } + + private void ensureLastPageFlushed() { // Mark the page as the last page so that it will close the output file channel. appendPage.setLastPage(); // Make sure to flush whatever left in the log tail. appendPage.setFull(); - //wait until all log records have been flushed in the current file synchronized (flushLSN) { - try { - while (flushLSN.get() != appendLSN.get()) { - //notification will come from LogBuffer.internalFlush(.) + while (flushLSN.get() != appendLSN.get()) { + // notification will come from LogBuffer.internalFlush(.) + try { flushLSN.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ACIDException(e); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } } - //move appendLSN and flushLSN to the first LSN of the next log file - appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get())); - flushLSN.set(appendLSN.get()); - appendChannel = getFileChannel(appendLSN.get(), true); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Created new txn log file with id(" + getLogFileId(appendLSN.get()) + ") starting with LSN = " - + appendLSN.get()); - } - //[Notice] - //the current log file channel is closed if - //LogBuffer.flush() completely flush the last page of the file. } @Override @@ -386,8 +410,8 @@ * The log file which contains the checkpointLSN has been reached. * The oldest log file being accessed by a LogReader has been reached. */ - if (id >= checkpointLSNLogFileID - || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) { + if (id >= checkpointLSNLogFileID || (txnLogFileId2ReaderCount.containsKey(id) + && txnLogFileId2ReaderCount.get(id) > 0)) { break; } @@ -475,11 +499,11 @@ return logFileIds; } - public String getLogFilePath(long fileId) { + private String getLogFilePath(long fileId) { return logDir + File.separator + logFilePrefix + "_" + fileId; } - public long getLogFileOffset(long lsn) { + private long getLogFileOffset(long lsn) { return lsn % logFileSize; } @@ -500,28 +524,24 @@ return (new File(path)).mkdir(); } - private FileChannel getFileChannel(long lsn, boolean create) { - FileChannel newFileChannel = null; - try { - long fileId = getLogFileId(lsn); - String logFilePath = getLogFilePath(fileId); - File file = new File(logFilePath); - if (create) { - if (!file.createNewFile()) { - throw new IllegalStateException(); - } - } else { - if (!file.exists()) { - throw new IllegalStateException(); - } - } - RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "rw"); - newFileChannel = raf.getChannel(); - newFileChannel.position(getLogFileOffset(lsn)); - } catch (IOException e) { - throw new IllegalStateException(e); + private void createNextLogFile() throws IOException { + final long nextFileBeginLsn = getNextFileFirstLsn(); + final long fileId = getLogFileId(nextFileBeginLsn); + final Path nextFilePath = Paths.get(getLogFilePath(fileId)); + if (nextFilePath.toFile().exists()) { + LOGGER.warn("Ignored create log file {} since file already exists", nextFilePath.toString()); + return; } - return newFileChannel; + Files.createFile(nextFilePath); + } + + private void setLogPosition(long lsn) throws IOException { + final long fileId = getLogFileId(lsn); + final Path targetFilePath = Paths.get(getLogFilePath(fileId)); + final long targetPosition = getLogFileOffset(lsn); + final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw"); // NOSONAR closed by LogBuffer + appendChannel = raf.getChannel(); + appendChannel.position(targetPosition); } @Override diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index 99f365a..9b6a4f9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -25,11 +25,10 @@ import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.common.transactions.ITransactionContext; -import org.apache.asterix.common.transactions.ITransactionManager; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.utils.InvokeUtil; public class LogManagerWithReplication extends LogManager { @@ -42,7 +41,7 @@ } @Override - public void log(ILogRecord logRecord) throws ACIDException { + public void log(ILogRecord logRecord) { boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT; if (shouldReplicate) { switch (logRecord.getLogType()) { @@ -66,7 +65,7 @@ //Remote flush logs do not need to be flushed separately since they may not trigger local flush if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) { - flushLogsQ.offer(logRecord); + flushLogsQ.add(logRecord); return; } @@ -74,7 +73,7 @@ } @Override - protected void appendToLogTail(ILogRecord logRecord) throws ACIDException { + protected void appendToLogTail(ILogRecord logRecord) { syncAppendToLogTail(logRecord); if (logRecord.isReplicated()) { @@ -82,62 +81,26 @@ replicationManager.replicateLog(logRecord); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + throw new ACIDException(e); } } - if (logRecord.getLogSource() == LogSource.LOCAL) { - if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT - || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) { + if (logRecord.getLogSource() == LogSource.LOCAL && waitForFlush(logRecord) && !logRecord.isFlushed()) { + InvokeUtil.doUninterruptibly(() -> { synchronized (logRecord) { while (!logRecord.isFlushed()) { - try { - logRecord.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + logRecord.wait(); } - //wait for job Commit/Abort ACK from replicas if (logRecord.isReplicated() && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { while (!replicationManager.hasBeenReplicated(logRecord)) { - try { - logRecord.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + logRecord.wait(); } } } - } + }); } - } - - @Override - protected synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException { - if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) { - ITransactionContext txnCtx = logRecord.getTxnCtx(); - if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) { - throw new ACIDException( - "Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record."); - } - } - - final int logRecordSize = logRecord.getLogSize(); - // Make sure the log will not exceed the log file size - if (getLogFileOffset(appendLSN.get()) + logRecordSize >= logFileSize) { - prepareNextLogFile(); - prepareNextPage(logRecordSize); - } else if (!appendPage.hasSpace(logRecordSize)) { - prepareNextPage(logRecordSize); - } - appendPage.append(logRecord, appendLSN.get()); - - if (logRecord.getLogType() == LogType.FLUSH) { - logRecord.setLSN(appendLSN.get()); - } - - appendLSN.addAndGet(logRecordSize); } @Override @@ -145,5 +108,4 @@ this.replicationManager = replicationManager; this.replicationStrategy = replicationManager.getReplicationStrategy(); } - } -- To view, visit https://asterix-gerrit.ics.uci.edu/2248 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I6936a60dc572e07f01baabc3f8af3bf88a58f661 Gerrit-PatchSet: 15 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
