Michael Blow has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2106
Change subject: [NO ISSUE][*DB] LogFlusher fixes ...................................................................... [NO ISSUE][*DB] LogFlusher fixes Change-Id: I19e150f2560573738938967f389a397ad7150a4d --- M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java 2 files changed, 61 insertions(+), 60 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/06/2106/1 diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 081cf02..fa5e1bd 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -105,15 +105,15 @@ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.WAIT) { logRecord.isFlushed(false); - syncCommitQ.offer(logRecord); + syncCommitQ.add(logRecord); } if (logRecord.getLogType() == LogType.FLUSH) { logRecord.isFlushed(false); - flushQ.offer(logRecord); + flushQ.add(logRecord); } } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { - remoteJobsQ.offer(logRecord); + remoteJobsQ.add(logRecord); } this.notify(); } @@ -169,12 +169,13 @@ @Override public void flush() { + boolean interrupted = false; try { int endOffset; while (!full.get()) { - synchronized (this) { - if (appendOffset - flushOffset == 0 && !full.get()) { - try { + try { + synchronized (this) { + if (appendOffset - flushOffset == 0 && !full.get()) { if (IS_DEBUG_MODE) { LOGGER.info("flush()| appendOffset: " + appendOffset + ", flushOffset: " + flushOffset + ", full: " + full.get()); @@ -183,14 +184,14 @@ fileChannel.close(); return; } - this.wait(); - } catch (InterruptedException e) { - continue; + wait(); } + endOffset = appendOffset; } - endOffset = appendOffset; - } internalFlush(flushOffset, endOffset); + } catch (InterruptedException e) { + interrupted = true; + } } internalFlush(flushOffset, appendOffset); if (isLastPage) { @@ -198,6 +199,10 @@ } } catch (IOException e) { throw new IllegalStateException(e); + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } @@ -230,7 +235,7 @@ if (endOffset > beginOffset) { logBufferTailReader.initializeScan(beginOffset, endOffset); - ITransactionContext txnCtx = null; + ITransactionContext txnCtx; LogRecord logRecord = logBufferTailReader.next(); while (logRecord != null) { @@ -327,8 +332,9 @@ } @Override - public void stop() { - this.stop = true; + public synchronized void stop() { + stop = true; + notifyAll(); } @Override diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index e5e91e8..1f45103 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; @@ -162,7 +163,7 @@ } } - /** + /* * To eliminate the case where the modulo of the next appendLSN = 0 (the next * appendLSN = the first LSN of the next log file), we do not allow a log to be * written at the last offset of the current file. @@ -616,13 +617,11 @@ * The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH log and there are no empty log buffers available to log it. */ private class FlushLogsLogger extends Thread { - private ILogRecord logRecord; - @Override public void run() { while (true) { try { - logRecord = flushLogsQ.take(); + ILogRecord logRecord = flushLogsQ.take(); appendToLogTail(logRecord); } catch (ACIDException e) { e.printStackTrace(); @@ -641,77 +640,73 @@ private final LinkedBlockingQueue<ILogBuffer> emptyQ; private final LinkedBlockingQueue<ILogBuffer> flushQ; private final LinkedBlockingQueue<ILogBuffer> stashQ; - private ILogBuffer flushPage; - private final AtomicBoolean isStarted; - private final AtomicBoolean terminateFlag; + private volatile ILogBuffer flushPage; + private final Semaphore started; - public LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ, + LogFlusher(LogManager logMgr, LinkedBlockingQueue<ILogBuffer> emptyQ, LinkedBlockingQueue<ILogBuffer> flushQ, LinkedBlockingQueue<ILogBuffer> stashQ) { this.logMgr = logMgr; this.emptyQ = emptyQ; this.flushQ = flushQ; this.stashQ = stashQ; - flushPage = null; - isStarted = new AtomicBoolean(false); - terminateFlag = new AtomicBoolean(false); - + this.started = new Semaphore(0); } public void terminate() { //make sure the LogFlusher thread started before terminating it. - synchronized (isStarted) { - while (!isStarted.get()) { - try { - isStarted.wait(); - } catch (InterruptedException e) { - //ignore - } + boolean interrupted = false; + while (true) { + try { + started.acquire(); + break; + } catch (InterruptedException e) { // NOSONAR- will re-interrupt after termination is processed + interrupted = true; } } - terminateFlag.set(true); - if (flushPage != null) { - synchronized (flushPage) { - flushPage.stop(); - flushPage.notify(); + final ILogBuffer currentFlushPage = flushPage; + if (currentFlushPage != null) { + currentFlushPage.stop(); + } + while (true) { + try { + flushQ.put(POISON_PILL); + break; + } catch (InterruptedException e) { // NOSONAR- will re-interrupt after termination is processed + interrupted = true; } } - //[Notice] - //The return value doesn't need to be checked - //since terminateFlag will trigger termination if the flushQ is full. - flushQ.offer(POISON_PILL); + if (interrupted) { + Thread.currentThread().interrupt(); + } } @Override - public Boolean call() { - synchronized (isStarted) { - isStarted.set(true); - isStarted.notify(); - } + public Boolean call() throws InterruptedException { + started.release(); + boolean interrupted = false; try { while (true) { flushPage = null; try { flushPage = flushQ.take(); - if (flushPage == POISON_PILL || terminateFlag.get()) { + if (flushPage == POISON_PILL) { return true; } - } catch (InterruptedException e) { - if (flushPage == null) { - continue; - } + flushPage.flush(); + // TODO(mblow): recycle large pages + emptyQ.add(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove()); + } catch (InterruptedException e) { // NOSONAR: will rethrow after we have finished flushing + interrupted = true; } - flushPage.flush(); - emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove()); } } catch (Exception e) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("-------------------------------------------------------------------------"); - LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state."); - LOGGER.info("-------------------------------------------------------------------------"); - } - e.printStackTrace(); + LOGGER.log(Level.SEVERE, "LogFlusher is terminating abnormally. System is in unusable state.", e); throw e; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2106 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I19e150f2560573738938967f389a397ad7150a4d Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <mb...@apache.org>