Murtadha Hubail has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2165
Change subject: [NO ISSUE][TX] Make TxnLogFile Close Idempotent ...................................................................... [NO ISSUE][TX] Make TxnLogFile Close Idempotent - user model changes: no - storage format changes: no - interface changes: yes Renamed ILogReader.initializeScan to setPosition and added javadocs. Details: Currently there is an explicit check that the file channel of a TxnLogFile is open before closing it. However, the channel could be closed due to interrupts and therefore we should remove the explicit check and always try to close it. However, we should always decrement the TxnLogFile references counter even if the channel is not open since that TxnLogFile is not accessed anymore. Change-Id: I255e4b9af0bc78298c0a25daf0b5629d413eba6a --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java 4 files changed, 47 insertions(+), 47 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/65/2165/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 7bc5697..19966fe 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -201,7 +201,7 @@ jobId2WinnerEntitiesMap = new HashMap<>(); //set log reader to the lowWaterMarkLsn ILogRecord logRecord; - logReader.initializeScan(lowWaterMarkLSN); + logReader.setPosition(lowWaterMarkLSN); logRecord = logReader.next(); while (logRecord != null) { if (IS_DEBUG_MODE) { @@ -300,7 +300,7 @@ ILogRecord logRecord = null; try { - logReader.initializeScan(lowWaterMarkLSN); + logReader.setPosition(lowWaterMarkLSN); logRecord = logReader.next(); while (logRecord != null) { if (IS_DEBUG_MODE) { @@ -540,7 +540,7 @@ Set<Integer> activePartitions = localResourceRepository.getActivePartitions(); ILogReader logReader = logMgr.getLogReader(false); try { - logReader.initializeScan(firstLSN); + logReader.setPosition(firstLSN); ILogRecord logRecord = null; while (currentLSN < lastLSN) { logRecord = logReader.next(); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java index da188e3..8539e2b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java @@ -18,18 +18,34 @@ */ package org.apache.asterix.common.transactions; -import org.apache.asterix.common.exceptions.ACIDException; - public interface ILogReader { - public void initializeScan(long beginLSN) throws ACIDException; + /** + * Sets the log reader position at log sequence number with value {@code lsn}. + * + * @param lsn + */ + void setPosition(long lsn); - //for scanning - public ILogRecord next() throws ACIDException; + /** + * Reads and returns the log record located at the log reader current position. After reading the log record, + * the log reader position is incremented by the size of the read log. + * + * @return the log record + */ + ILogRecord next(); - //for random reading - public ILogRecord read(long readLSN) throws ACIDException; + /** + * Reads and returns the log record with log sequence number {@code lsn}. + * + * @param lsn + * @return The log record + */ + ILogRecord read(long lsn); - public void close() throws ACIDException; + /** + * Closes the log reader and any resources used. + */ + void close(); -} +} \ No newline at end of file diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index dd0a5c7..1cf7a50 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -579,9 +579,6 @@ @Override public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException { - if (!fileChannel.isOpen()) { - throw new IllegalStateException("File channel is not open"); - } fileChannel.close(); untouchLogFile(logFileRef.getLogFileId()); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java index 148aa7e..4994134 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java @@ -30,14 +30,11 @@ import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.MutableLong; import org.apache.asterix.common.transactions.TxnLogFile; +import org.apache.hyracks.util.annotations.NotThreadSafe; -/** - * NOTE: Many method calls of this class are not thread safe. - * Be very cautious using it in a multithreaded context. - */ +@NotThreadSafe public class LogReader implements ILogReader { - public static final boolean IS_DEBUG_MODE = false;//true private static final Logger LOGGER = Logger.getLogger(LogReader.class.getName()); private final ILogManager logMgr; private final long logFileSize; @@ -54,7 +51,7 @@ private enum ReturnState { FLUSH, EOF - }; + } public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN, boolean isRecoveryMode) { @@ -68,8 +65,8 @@ } @Override - public void initializeScan(long beginLSN) throws ACIDException { - readLSN = beginLSN; + public void setPosition(long lsn) { + readLSN = lsn; if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) { return; } @@ -84,7 +81,7 @@ * @throws ACIDException */ @Override - public ILogRecord next() throws ACIDException { + public ILogRecord next() { if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) { return null; } @@ -147,13 +144,10 @@ return ReturnState.EOF; } try { - if (IS_DEBUG_MODE) { - LOGGER.info( - "waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN); - } flushLSN.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); + throw new ACIDException(e); } } return ReturnState.FLUSH; @@ -166,10 +160,9 @@ * @return true if log continues, false if EOF * @throws ACIDException */ - private boolean refillLogReadBuffer() throws ACIDException { + private boolean refillLogReadBuffer() { try { if (readLSN % logFileSize == logFile.size()) { - logFile.close(); readLSN += logFileSize - (readLSN % logFileSize); getLogFile(); } @@ -183,14 +176,12 @@ * Fills the log buffer with data from the log file at the current position * * @return false if EOF, true otherwise - * @throws ACIDException */ - - private boolean fillLogReadBuffer() throws ACIDException { + private boolean fillLogReadBuffer() { return fillLogReadBuffer(logPageSize, readBuffer); } - private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) throws ACIDException { + private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) { int size = 0; int read = 0; readBuffer.position(0); @@ -217,10 +208,9 @@ return true; } - //for random reading @Override - public ILogRecord read(long LSN) throws ACIDException { - readLSN = LSN; + public ILogRecord read(long lsn) { + readLSN = lsn; //wait for the log to be flushed if needed before trying to read it. synchronized (flushLSN) { while (readLSN >= flushLSN.get()) { @@ -232,13 +222,8 @@ } } try { - if (logFile == null) { + if (logFile == null || readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) { //get the log file which contains readLSN - getLogFile(); - fillLogReadBuffer(); - } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) { - //log is not in the current log file - logFile.close(); getLogFile(); fillLogReadBuffer(); } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) { @@ -265,7 +250,7 @@ case TRUNCATED: { if (!fillLogReadBuffer()) { throw new IllegalStateException( - "Could not read LSN(" + LSN + ") from log file id " + logFile.getLogFileId()); + "Could not read LSN(" + lsn + ") from log file id " + logFile.getLogFileId()); } //now read the complete log record continue; @@ -285,8 +270,10 @@ return logRecord; } - private void getLogFile() throws ACIDException { + private void getLogFile() { try { + // close existing file (if any) before opening another one + close(); logFile = logMgr.getLogFile(readLSN); fileBeginLSN = logFile.getFileBeginLSN(); } catch (IOException e) { @@ -295,7 +282,7 @@ } @Override - public void close() throws ACIDException { + public void close() { try { if (logFile != null) { logFile.close(); -- To view, visit https://asterix-gerrit.ics.uci.edu/2165 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I255e4b9af0bc78298c0a25daf0b5629d413eba6a Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>