Michael Blow has submitted this change and it was merged. Change subject: ASTERIXDB-1438: BufferCache spins indefinitely... ......................................................................
ASTERIXDB-1438: BufferCache spins indefinitely... Fixes ASTERIXDB-1438: BufferCache spins indefinitely when cache is exceeded After three unsuccessful cycles (each traversing clock pages three times) and waiting for dirty pages to be cleaned, an exception is thrown Change-Id: I327a7423bd630c96e16601b1a3a2a21f558f9f50 Reviewed-on: https://asterix-gerrit.ics.uci.edu/894 Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java 4 files changed, 131 insertions(+), 118 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java index 1bbe6db..114dcb8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java @@ -125,6 +125,7 @@ } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // TODO what do we do here? e.printStackTrace(); } @@ -140,6 +141,7 @@ try { entry = queue.take(); } catch(InterruptedException e) { + Thread.currentThread().interrupt(); break; } if (entry.getQueueInfo() != null && entry.getQueueInfo().hasWaiters()){ diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 2b6dd73..201526b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -54,6 +54,7 @@ private static final int MIN_CLEANED_COUNT_DIFF = 3; private static final int PIN_MAX_WAIT_TIME = 50; + private static final int MAX_PIN_ATTEMPT_CYCLES = 3; public static final boolean DEBUG = false; private final int pageSize; @@ -74,7 +75,7 @@ private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner; //!DEBUG private IIOReplicationManager ioReplicationManager; - private List<ICachedPageInternal> cachedPages = new ArrayList<>(); + private final List<ICachedPageInternal> cachedPages = new ArrayList<>(); private boolean closed; @@ -94,16 +95,16 @@ this.fileMapManager = fileMapManager; Executor executor = Executors.newCachedThreadPool(threadFactory); - fileInfoMap = new HashMap<Integer, BufferedFileHandle>(); - virtualFiles = new HashSet<Integer>(); + fileInfoMap = new HashMap<>(); + virtualFiles = new HashSet<>(); cleanerThread = new CleanerThread(); executor.execute(cleanerThread); closed = false; fifoWriter = new AsyncFIFOPageQueueManager(this); if( DEBUG ) { - confiscatedPages = new ArrayList<CachedPage>(); - confiscatedPagesOwner = new HashMap<CachedPage, StackTraceElement[]>(); + confiscatedPages = new ArrayList<>(); + confiscatedPagesOwner = new HashMap<>(); confiscateLock = new ReentrantLock(); pinnedPageOwner = new ConcurrentHashMap<>(); } @@ -135,7 +136,7 @@ // check whether file has been created and opened int fileId = BufferedFileHandle.getFileId(dpid); - BufferedFileHandle fInfo = null; + BufferedFileHandle fInfo; synchronized (fileInfoMap) { fInfo = fileInfoMap.get(fileId); } @@ -185,7 +186,7 @@ if (DEBUG) { pinSanityCheck(dpid); } - CachedPage cPage = findPage(dpid, false); + CachedPage cPage = findPage(dpid); if (!newPage) { if (DEBUG) { confiscateLock.lock(); @@ -220,13 +221,9 @@ return cPage; } - private boolean isVirtual(long vpid) throws HyracksDataException { - CachedPage virtPage = findPage(vpid, true); - return virtPage.confiscated.get(); - } + private CachedPage findPage(long dpid) throws HyracksDataException { - private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException { - while (true) { + for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) { int startCleanedCount = cleanerThread.cleanedCount; CachedPage cPage = null; @@ -428,11 +425,11 @@ victimBucket.cachedPage = victim.next; } else { CachedPage victimPrev = victimBucket.cachedPage; - while (victimPrev != null && victimPrev.next != victim) { + while (victimPrev.next != victim) { victimPrev = victimPrev.next; - } - if(DEBUG) { - assert victimPrev != null; + if (victimPrev == null) { + throw new IllegalStateException(); + } } victimPrev.next = victim.next; } @@ -449,8 +446,13 @@ return victim; } } - synchronized (cleanerThread) { - pageCleanerPolicy.notifyVictimNotFound(cleanerThread); + synchronized (cleanerThread.threadLock) { + try { + pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); + } catch (InterruptedException e) { + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); + } } // Heuristic optimization. Check whether the cleaner thread has // cleaned pages since we did our last pin attempt. @@ -461,12 +463,18 @@ } synchronized (cleanerThread.cleanNotification) { try { - cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); + // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify + do { + cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); + } while (false); } catch (InterruptedException e) { - // Do nothing + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); } } } + throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + + " cycles (buffer cache undersized?)"); } private String dumpState() { @@ -651,21 +659,15 @@ } } - private class CleanerThread extends Thread { - private boolean shutdownStart = false; - private boolean shutdownComplete = false; + private class CleanerThread implements Runnable { + private volatile boolean shutdownStart = false; + private volatile boolean shutdownComplete = false; + private final Object threadLock = new Object(); private final Object cleanNotification = new Object(); // Simply keeps incrementing this counter when a page is cleaned. // Used to implement wait-for-cleanerthread heuristic optimizations. // A waiter can detect whether pages have been cleaned. - // No need to make this var volatile or synchronize it's access in any - // way because it is used for heuristics. - private int cleanedCount = 0; - - public CleanerThread() { - setPriority(Thread.NORM_PRIORITY); - setDaemon(true); - } + private volatile int cleanedCount = 0; public void cleanPage(CachedPage cPage, boolean force) { if (cPage.dirty.get() && !cPage.confiscated.get()) { @@ -678,24 +680,7 @@ } if (proceed) { try { - // Make sure page is still dirty. - if (!cPage.dirty.get()) { - return; - } - boolean cleaned = true; - try { - write(cPage); - } catch (HyracksDataException e) { - cleaned = false; - } - if (cleaned) { - cPage.dirty.set(false); - cPage.pinCount.decrementAndGet(); - cleanedCount++; - synchronized (cleanNotification) { - cleanNotification.notifyAll(); - } - } + cleanPageLocked(cPage); } finally { if (force) { cPage.latch.writeLock().unlock(); @@ -710,34 +695,63 @@ } } - @Override - public synchronized void run() { + private void cleanPageLocked(CachedPage cPage) { + // Make sure page is still dirty. + if (!cPage.dirty.get()) { + return; + } + boolean cleaned = true; try { - while (true) { - pageCleanerPolicy.notifyCleanCycleStart(this); - int curPage = 0; - while (true) { - synchronized (cachedPages) { - if (curPage >= cachedPages.size()) { - break; - } - CachedPage cPage = (CachedPage) cachedPages.get(curPage); - if (cPage != null) { - cleanPage(cPage, false); - } - } - curPage++; + write(cPage); + } catch (HyracksDataException e) { + LOGGER.log(Level.WARNING, "Unable to write dirty page", e); + cleaned = false; + } + if (cleaned) { + cPage.dirty.set(false); + cPage.pinCount.decrementAndGet(); + // this increment of a volatile is OK as there is only one writer + cleanedCount++; + synchronized (cleanNotification) { + cleanNotification.notifyAll(); + } + } + } + + @Override + public void run() { + synchronized (threadLock) { + try { + while (!shutdownStart) { + runCleanCycle(); } - if (shutdownStart) { + } catch (InterruptedException e) { + + Thread.currentThread().interrupt(); + } finally { + shutdownComplete = true; + threadLock.notifyAll(); + } + } + } + + private void runCleanCycle() throws InterruptedException { + pageCleanerPolicy.notifyCleanCycleStart(threadLock); + int curPage = 0; + while (true) { + synchronized (cachedPages) { + if (curPage >= cachedPages.size()) { break; } - pageCleanerPolicy.notifyCleanCycleFinish(this); + CachedPage cPage = (CachedPage) cachedPages.get(curPage); + if (cPage != null) { + cleanPage(cPage, false); + } } - } catch (Exception e) { - e.printStackTrace(); - } finally { - shutdownComplete = true; - notifyAll(); + curPage++; + } + if (!shutdownStart) { + pageCleanerPolicy.notifyCleanCycleFinish(threadLock); } } } @@ -746,29 +760,31 @@ public void close() { closed = true; fifoWriter.destroyQueue(); - synchronized (cleanerThread) { - cleanerThread.shutdownStart = true; - cleanerThread.notifyAll(); - while (!cleanerThread.shutdownComplete) { - try { - cleanerThread.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); + try { + synchronized (cleanerThread.threadLock) { + cleanerThread.shutdownStart = true; + cleanerThread.threadLock.notifyAll(); + while (!cleanerThread.shutdownComplete) { + cleanerThread.threadLock.wait(); } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } synchronized (fileInfoMap) { - try { - for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) { + for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) { + try { boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted(); sweepAndFlush(entry.getKey(), !fileHasBeenDeleted); if (!fileHasBeenDeleted) { ioManager.close(entry.getValue().getFileHandle()); } + } catch (HyracksDataException e) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Error flushing file id: " + entry.getKey(), e); + } } - } catch (HyracksDataException e) { - e.printStackTrace(); } fileInfoMap.clear(); } @@ -848,8 +864,7 @@ } private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException { - for (int i = 0; i < pageMap.length; ++i) { - final CacheBucket bucket = pageMap[i]; + for (final CacheBucket bucket : pageMap) { bucket.bucketLock.lock(); try { CachedPage prev = bucket.cachedPage; @@ -882,7 +897,7 @@ private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages) throws HyracksDataException { if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) { - int pinCount = -1; + int pinCount; if (cPage.dirty.get()) { if (flushDirtyPages) { write(cPage); @@ -935,7 +950,7 @@ @Override public void force(int fileId, boolean metadata) throws HyracksDataException { - BufferedFileHandle fInfo = null; + BufferedFileHandle fInfo; synchronized (fileInfoMap) { fInfo = fileInfoMap.get(fileId); } @@ -1016,10 +1031,7 @@ synchronized (cachedPages) { final int cpid = page.getCachedPageId(); if (cpid < cachedPages.size()) { - ICachedPageInternal old = cachedPages.set(cpid, page); - if (DEBUG) { - assert old == null; - } + cachedPages.set(cpid, page); } else { if (cpid > cachedPages.size()) { // 4 > 1 -> [exiting, null, null, null, new] @@ -1135,7 +1147,7 @@ @Override public ICachedPage confiscatePage(long dpid) throws HyracksDataException { - return confiscatePage(dpid, () -> pageReplacementStrategy.findVictim()); + return confiscatePage(dpid, pageReplacementStrategy::findVictim); } @Override @@ -1145,7 +1157,7 @@ private ICachedPage confiscatePage(long dpid, Supplier<ICachedPageInternal> victimSupplier) throws HyracksDataException { - while (true) { + for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) { int startCleanedCount = cleanerThread.cleanedCount; ICachedPage returnPage = null; CachedPage victim = (CachedPage) victimSupplier.get(); @@ -1238,8 +1250,13 @@ return returnPage; } // no page available to confiscate. try kicking the cleaner thread. - synchronized (cleanerThread) { - pageCleanerPolicy.notifyVictimNotFound(cleanerThread); + synchronized (cleanerThread.threadLock) { + try { + pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); + } catch (InterruptedException e) { + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); + } } // Heuristic optimization. Check whether the cleaner thread has // cleaned pages since we did our last pin attempt. @@ -1250,12 +1267,18 @@ } synchronized (cleanerThread.cleanNotification) { try { - cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); + // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify + do { + cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); + } while (false); } catch (InterruptedException e) { - // Do nothing + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); } } } + throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + + " cycles (buffer cache undersized?)"); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java index 276e26a..ceb9c36 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.storage.common.buffercache; -import org.apache.hyracks.api.exceptions.HyracksDataException; - public class DelayPageCleanerPolicy implements IPageCleanerPolicy { private long delay; @@ -28,21 +26,16 @@ } @Override - public void notifyCleanCycleStart(Object monitor) throws HyracksDataException { - + public void notifyCleanCycleStart(Object monitor) throws InterruptedException { } @Override - public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException { - try { - monitor.wait(delay); - } catch (InterruptedException e) { - throw new HyracksDataException(e); - } + public void notifyCleanCycleFinish(Object monitor) throws InterruptedException { + monitor.wait(delay); } @Override - public void notifyVictimNotFound(Object monitor) throws HyracksDataException { + public void notifyVictimNotFound(Object monitor) { monitor.notifyAll(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java index 897ae75..e62547c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java @@ -18,8 +18,6 @@ */ package org.apache.hyracks.storage.common.buffercache; -import org.apache.hyracks.api.exceptions.HyracksDataException; - /** * Allows customization of the page cleaning strategy by the cleaner thread. * @@ -31,25 +29,22 @@ * * @param monitor * - The monitor on which a mutex is held while in this call - * @throws HyracksDataException */ - public void notifyCleanCycleStart(Object monitor) throws HyracksDataException; + void notifyCleanCycleStart(Object monitor) throws InterruptedException; /** * Callback from the cleaner just after the finish of a cleaning cycle. * * @param monitor * - The monitor on which a mutex is held while in this call. - * @throws HyracksDataException */ - public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException; + void notifyCleanCycleFinish(Object monitor) throws InterruptedException; /** * Callback to indicate that no victim was found. * * @param monitor * - The monitor on which a mutex is held while in this call. - * @throws HyracksDataException */ - public void notifyVictimNotFound(Object monitor) throws HyracksDataException; + void notifyVictimNotFound(Object monitor) throws InterruptedException; } -- To view, visit https://asterix-gerrit.ics.uci.edu/894 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I327a7423bd630c96e16601b1a3a2a21f558f9f50 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
