Michael Blow has submitted this change and it was merged. Change subject: Ensure LogManager Doesn't Exceed the Size of the Log Page Queues ......................................................................
Ensure LogManager Doesn't Exceed the Size of the Log Page Queues Change-Id: If6427576a31090a057ee6a3d25e35eef5cdd86f8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1342 Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java 1 file changed, 22 insertions(+), 13 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 814bcfc..57d5c39 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -67,6 +67,7 @@ private final MutableLong flushLSN; private LinkedBlockingQueue<LogBuffer> emptyQ; private LinkedBlockingQueue<LogBuffer> flushQ; + private LinkedBlockingQueue<LogBuffer> stashQ; protected final AtomicLong appendLSN; private FileChannel appendChannel; protected LogBuffer appendPage; @@ -97,8 +98,9 @@ } private void initializeLogManager(long nextLogFileId) { - emptyQ = new LinkedBlockingQueue<LogBuffer>(numLogPages); - flushQ = new LinkedBlockingQueue<LogBuffer>(numLogPages); + emptyQ = new LinkedBlockingQueue<>(numLogPages); + flushQ = new LinkedBlockingQueue<>(numLogPages); + stashQ = new LinkedBlockingQueue<>(numLogPages); for (int i = 0; i < numLogPages; i++) { emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN)); } @@ -109,7 +111,7 @@ } appendChannel = getFileChannel(appendLSN.get(), false); getAndInitNewPage(INITIAL_LOG_SIZE); - logFlusher = new LogFlusher(this, emptyQ, flushQ); + logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ); futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher); if (!flushLogsLogger.isAlive()) { txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger); @@ -182,8 +184,18 @@ protected void getAndInitNewPage(int logSize) { if (logSize > logPageSize) { + // before creating a new page, we need to stash a normal sized page since our queues have fixed capacity + appendPage = null; + while (appendPage == null) { + try { + appendPage = emptyQ.take(); + stashQ.add(appendPage); + } catch (InterruptedException e) { + //ignore + } + } // for now, alloc a new buffer for each large page - // TODO: pool large pages + // TODO: pool large pages?? appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN); appendPage.setFileChannel(appendChannel); flushQ.offer(appendPage); @@ -299,10 +311,6 @@ e.printStackTrace(); } } - } - - public MutableLong getFlushLSN() { - return flushLSN; } private long initializeLogAnchor(long nextLogFileId) { @@ -440,7 +448,7 @@ } }); if (logFileNames != null && logFileNames.length != 0) { - logFileIds = new ArrayList<Long>(); + logFileIds = new ArrayList<>(); for (String fileName : logFileNames) { logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1))); } @@ -621,14 +629,17 @@ private final LogManager logMgr;//for debugging private final LinkedBlockingQueue<LogBuffer> emptyQ; private final LinkedBlockingQueue<LogBuffer> flushQ; + private final LinkedBlockingQueue<LogBuffer> stashQ; private LogBuffer flushPage; private final AtomicBoolean isStarted; private final AtomicBoolean terminateFlag; - public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ) { + public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ, + LinkedBlockingQueue<LogBuffer> stashQ) { this.logMgr = logMgr; this.emptyQ = emptyQ; this.flushQ = flushQ; + this.stashQ = stashQ; flushPage = null; isStarted = new AtomicBoolean(false); terminateFlag = new AtomicBoolean(false); @@ -680,9 +691,7 @@ } } flushPage.flush(); - if (flushPage.getLogPageSize() == logMgr.getLogPageSize()) { - emptyQ.offer(flushPage); - } + emptyQ.offer(flushPage.getLogPageSize() == logMgr.getLogPageSize() ? flushPage : stashQ.remove()); } } catch (Exception e) { if (LOGGER.isLoggable(Level.INFO)) { -- To view, visit https://asterix-gerrit.ics.uci.edu/1342 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: If6427576a31090a057ee6a3d25e35eef5cdd86f8 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
