Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2165

Change subject: [NO ISSUE][TX] Make TxnLogFile Close Idempotent
......................................................................

[NO ISSUE][TX] Make TxnLogFile Close Idempotent

- user model changes: no
- storage format changes: no
- interface changes: yes
  Renamed ILogReader.initializeScan to setPosition and added
  javadocs.

Details:
Currently there is an explicit check that the file channel
of a TxnLogFile is open before closing it. However, the
channel could be closed due to interrupts and therefore
we should remove the explicit check and always try to close
it. However, we should always decrement the TxnLogFile
references counter even if the channel is not open since
that TxnLogFile is not accessed anymore.

Change-Id: I255e4b9af0bc78298c0a25daf0b5629d413eba6a
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
4 files changed, 47 insertions(+), 47 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/65/2165/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 7bc5697..19966fe 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -201,7 +201,7 @@
         jobId2WinnerEntitiesMap = new HashMap<>();
         //set log reader to the lowWaterMarkLsn
         ILogRecord logRecord;
-        logReader.initializeScan(lowWaterMarkLSN);
+        logReader.setPosition(lowWaterMarkLSN);
         logRecord = logReader.next();
         while (logRecord != null) {
             if (IS_DEBUG_MODE) {
@@ -300,7 +300,7 @@
 
         ILogRecord logRecord = null;
         try {
-            logReader.initializeScan(lowWaterMarkLSN);
+            logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
             while (logRecord != null) {
                 if (IS_DEBUG_MODE) {
@@ -540,7 +540,7 @@
         Set<Integer> activePartitions = 
localResourceRepository.getActivePartitions();
         ILogReader logReader = logMgr.getLogReader(false);
         try {
-            logReader.initializeScan(firstLSN);
+            logReader.setPosition(firstLSN);
             ILogRecord logRecord = null;
             while (currentLSN < lastLSN) {
                 logRecord = logReader.next();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
index da188e3..8539e2b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
@@ -18,18 +18,34 @@
  */
 package org.apache.asterix.common.transactions;
 
-import org.apache.asterix.common.exceptions.ACIDException;
-
 public interface ILogReader {
 
-    public void initializeScan(long beginLSN) throws ACIDException;
+    /**
+     * Sets the log reader position at log sequence number with value {@code 
lsn}.
+     *
+     * @param lsn
+     */
+    void setPosition(long lsn);
 
-    //for scanning
-    public ILogRecord next() throws ACIDException;
+    /**
+     * Reads and returns the log record located at the log reader current 
position. After reading the log record,
+     * the log reader position is incremented by the size of the read log.
+     *
+     * @return the log record
+     */
+    ILogRecord next();
 
-    //for random reading
-    public ILogRecord read(long readLSN) throws ACIDException;
+    /**
+     * Reads and returns the log record with log sequence number {@code lsn}.
+     *
+     * @param lsn
+     * @return The log record
+     */
+    ILogRecord read(long lsn);
 
-    public void close() throws ACIDException;
+    /**
+     * Closes the log reader and any resources used.
+     */
+    void close();
 
-}
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index dd0a5c7..1cf7a50 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -579,9 +579,6 @@
 
     @Override
     public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) 
throws IOException {
-        if (!fileChannel.isOpen()) {
-            throw new IllegalStateException("File channel is not open");
-        }
         fileChannel.close();
         untouchLogFile(logFileRef.getLogFileId());
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 148aa7e..4994134 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -30,14 +30,11 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.MutableLong;
 import org.apache.asterix.common.transactions.TxnLogFile;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 
-/**
- * NOTE: Many method calls of this class are not thread safe.
- * Be very cautious using it in a multithreaded context.
- */
+@NotThreadSafe
 public class LogReader implements ILogReader {
 
-    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = 
Logger.getLogger(LogReader.class.getName());
     private final ILogManager logMgr;
     private final long logFileSize;
@@ -54,7 +51,7 @@
     private enum ReturnState {
         FLUSH,
         EOF
-    };
+    }
 
     public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, 
MutableLong flushLSN,
             boolean isRecoveryMode) {
@@ -68,8 +65,8 @@
     }
 
     @Override
-    public void initializeScan(long beginLSN) throws ACIDException {
-        readLSN = beginLSN;
+    public void setPosition(long lsn) {
+        readLSN = lsn;
         if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
             return;
         }
@@ -84,7 +81,7 @@
      * @throws ACIDException
      */
     @Override
-    public ILogRecord next() throws ACIDException {
+    public ILogRecord next() {
         if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
             return null;
         }
@@ -147,13 +144,10 @@
                     return ReturnState.EOF;
                 }
                 try {
-                    if (IS_DEBUG_MODE) {
-                        LOGGER.info(
-                                "waitForFlushOrReturnIfEOF()| flushLSN: " + 
flushLSN.get() + ", readLSN: " + readLSN);
-                    }
                     flushLSN.wait();
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
+                    throw new ACIDException(e);
                 }
             }
             return ReturnState.FLUSH;
@@ -166,10 +160,9 @@
      * @return true if log continues, false if EOF
      * @throws ACIDException
      */
-    private boolean refillLogReadBuffer() throws ACIDException {
+    private boolean refillLogReadBuffer() {
         try {
             if (readLSN % logFileSize == logFile.size()) {
-                logFile.close();
                 readLSN += logFileSize - (readLSN % logFileSize);
                 getLogFile();
             }
@@ -183,14 +176,12 @@
      * Fills the log buffer with data from the log file at the current position
      *
      * @return false if EOF, true otherwise
-     * @throws ACIDException
      */
-
-    private boolean fillLogReadBuffer() throws ACIDException {
+    private boolean fillLogReadBuffer() {
         return fillLogReadBuffer(logPageSize, readBuffer);
     }
 
-    private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) 
throws ACIDException {
+    private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) {
         int size = 0;
         int read = 0;
         readBuffer.position(0);
@@ -217,10 +208,9 @@
         return true;
     }
 
-    //for random reading
     @Override
-    public ILogRecord read(long LSN) throws ACIDException {
-        readLSN = LSN;
+    public ILogRecord read(long lsn) {
+        readLSN = lsn;
         //wait for the log to be flushed if needed before trying to read it.
         synchronized (flushLSN) {
             while (readLSN >= flushLSN.get()) {
@@ -232,13 +222,8 @@
             }
         }
         try {
-            if (logFile == null) {
+            if (logFile == null || readLSN < fileBeginLSN || readLSN >= 
fileBeginLSN + logFile.size()) {
                 //get the log file which contains readLSN
-                getLogFile();
-                fillLogReadBuffer();
-            } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + 
logFile.size()) {
-                //log is not in the current log file
-                logFile.close();
                 getLogFile();
                 fillLogReadBuffer();
             } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + 
readBuffer.limit()) {
@@ -265,7 +250,7 @@
                 case TRUNCATED: {
                     if (!fillLogReadBuffer()) {
                         throw new IllegalStateException(
-                                "Could not read LSN(" + LSN + ") from log file 
id " + logFile.getLogFileId());
+                                "Could not read LSN(" + lsn + ") from log file 
id " + logFile.getLogFileId());
                     }
                     //now read the complete log record
                     continue;
@@ -285,8 +270,10 @@
         return logRecord;
     }
 
-    private void getLogFile() throws ACIDException {
+    private void getLogFile() {
         try {
+            // close existing file (if any) before opening another one
+            close();
             logFile = logMgr.getLogFile(readLSN);
             fileBeginLSN = logFile.getFileBeginLSN();
         } catch (IOException e) {
@@ -295,7 +282,7 @@
     }
 
     @Override
-    public void close() throws ACIDException {
+    public void close() {
         try {
             if (logFile != null) {
                 logFile.close();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2165
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I255e4b9af0bc78298c0a25daf0b5629d413eba6a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to