Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2106

Change subject: [NO ISSUE][*DB] LogFlusher fixes
......................................................................

[NO ISSUE][*DB] LogFlusher fixes

Change-Id: I19e150f2560573738938967f389a397ad7150a4d
---
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
2 files changed, 61 insertions(+), 60 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/06/2106/1

diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 081cf02..fa5e1bd 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -105,15 +105,15 @@
                 if (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT
                         || logRecord.getLogType() == LogType.WAIT) {
                     logRecord.isFlushed(false);
-                    syncCommitQ.offer(logRecord);
+                    syncCommitQ.add(logRecord);
                 }
                 if (logRecord.getLogType() == LogType.FLUSH) {
                     logRecord.isFlushed(false);
-                    flushQ.offer(logRecord);
+                    flushQ.add(logRecord);
                 }
             } else if (logRecord.getLogSource() == LogSource.REMOTE
                     && (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT)) {
-                remoteJobsQ.offer(logRecord);
+                remoteJobsQ.add(logRecord);
             }
             this.notify();
         }
@@ -169,12 +169,13 @@
 
     @Override
     public void flush() {
+        boolean interrupted = false;
         try {
             int endOffset;
             while (!full.get()) {
-                synchronized (this) {
-                    if (appendOffset - flushOffset == 0 && !full.get()) {
-                        try {
+                try {
+                    synchronized (this) {
+                        if (appendOffset - flushOffset == 0 && !full.get()) {
                             if (IS_DEBUG_MODE) {
                                 LOGGER.info("flush()| appendOffset: " + 
appendOffset + ", flushOffset: " + flushOffset
                                         + ", full: " + full.get());
@@ -183,14 +184,14 @@
                                 fileChannel.close();
                                 return;
                             }
-                            this.wait();
-                        } catch (InterruptedException e) {
-                            continue;
+                            wait();
                         }
+                        endOffset = appendOffset;
                     }
-                    endOffset = appendOffset;
-                }
                 internalFlush(flushOffset, endOffset);
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
             }
             internalFlush(flushOffset, appendOffset);
             if (isLastPage) {
@@ -198,6 +199,10 @@
             }
         } catch (IOException e) {
             throw new IllegalStateException(e);
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
@@ -230,7 +235,7 @@
         if (endOffset > beginOffset) {
             logBufferTailReader.initializeScan(beginOffset, endOffset);
 
-            ITransactionContext txnCtx = null;
+            ITransactionContext txnCtx;
 
             LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
@@ -327,8 +332,9 @@
     }
 
     @Override
-    public void stop() {
-        this.stop = true;
+    public synchronized void stop() {
+        stop = true;
+        notifyAll();
     }
 
     @Override
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index e5e91e8..1f45103 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -162,7 +163,7 @@
             }
         }
 
-        /**
+        /*
          * To eliminate the case where the modulo of the next appendLSN = 0 
(the next
          * appendLSN = the first LSN of the next log file), we do not allow a 
log to be
          * written at the last offset of the current file.
@@ -616,13 +617,11 @@
      * The deadlock happens when 
PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH 
log and there are no empty log buffers available to log it.
      */
     private class FlushLogsLogger extends Thread {
-        private ILogRecord logRecord;
-
         @Override
         public void run() {
             while (true) {
                 try {
-                    logRecord = flushLogsQ.take();
+                    ILogRecord logRecord = flushLogsQ.take();
                     appendToLogTail(logRecord);
                 } catch (ACIDException e) {
                     e.printStackTrace();
@@ -641,77 +640,73 @@
     private final LinkedBlockingQueue<ILogBuffer> emptyQ;
     private final LinkedBlockingQueue<ILogBuffer> flushQ;
     private final LinkedBlockingQueue<ILogBuffer> stashQ;
-    private ILogBuffer flushPage;
-    private final AtomicBoolean isStarted;
-    private final AtomicBoolean terminateFlag;
+    private volatile ILogBuffer flushPage;
+    private final Semaphore started;
 
-    public LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> 
emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ,
+    LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, 
LinkedBlockingQueue<ILogBuffer> flushQ,
             LinkedBlockingQueue<ILogBuffer> stashQ) {
         this.logMgr = logMgr;
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
         this.stashQ = stashQ;
-        flushPage = null;
-        isStarted = new AtomicBoolean(false);
-        terminateFlag = new AtomicBoolean(false);
-
+        this.started = new Semaphore(0);
     }
 
     public void terminate() {
         //make sure the LogFlusher thread started before terminating it.
-        synchronized (isStarted) {
-            while (!isStarted.get()) {
-                try {
-                    isStarted.wait();
-                } catch (InterruptedException e) {
-                    //ignore
-                }
+        boolean interrupted = false;
+        while (true) {
+            try {
+                started.acquire();
+                break;
+            } catch (InterruptedException e) { // NOSONAR- will re-interrupt 
after termination is processed
+                interrupted = true;
             }
         }
 
-        terminateFlag.set(true);
-        if (flushPage != null) {
-            synchronized (flushPage) {
-                flushPage.stop();
-                flushPage.notify();
+        final ILogBuffer currentFlushPage = flushPage;
+        if (currentFlushPage != null) {
+            currentFlushPage.stop();
+        }
+        while (true) {
+            try {
+                flushQ.put(POISON_PILL);
+                break;
+            } catch (InterruptedException e) { // NOSONAR- will re-interrupt 
after termination is processed
+                interrupted = true;
             }
         }
-        //[Notice]
-        //The return value doesn't need to be checked
-        //since terminateFlag will trigger termination if the flushQ is full.
-        flushQ.offer(POISON_PILL);
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     @Override
-    public Boolean call() {
-        synchronized (isStarted) {
-            isStarted.set(true);
-            isStarted.notify();
-        }
+    public Boolean call() throws InterruptedException {
+        started.release();
+        boolean interrupted = false;
         try {
             while (true) {
                 flushPage = null;
                 try {
                     flushPage = flushQ.take();
-                    if (flushPage == POISON_PILL || terminateFlag.get()) {
+                    if (flushPage == POISON_PILL) {
                         return true;
                     }
-                } catch (InterruptedException e) {
-                    if (flushPage == null) {
-                        continue;
-                    }
+                    flushPage.flush();
+                    // TODO(mblow): recycle large pages
+                    emptyQ.add(flushPage.getLogPageSize() == 
logMgr.getLogPageSize() ? flushPage : stashQ.remove());
+                } catch (InterruptedException e) { // NOSONAR: will rethrow 
after we have finished flushing
+                    interrupted = true;
                 }
-                flushPage.flush();
-                emptyQ.offer(flushPage.getLogPageSize() == 
logMgr.getLogPageSize() ? flushPage : stashQ.remove());
             }
         } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                
LOGGER.info("-------------------------------------------------------------------------");
-                LOGGER.info("LogFlusher is terminating abnormally. System is 
in unusalbe state.");
-                
LOGGER.info("-------------------------------------------------------------------------");
-            }
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "LogFlusher is terminating abnormally. 
System is in unusable state.", e);
             throw e;
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2106
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I19e150f2560573738938967f389a397ad7150a4d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mb...@apache.org>

Reply via email to