Michael Blow has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1038
Change subject: ASTERIXDB-1516: Unable to find free page in buffer cache after 3 cycles ...................................................................... ASTERIXDB-1516: Unable to find free page in buffer cache after 3 cycles Update exhaustion logic to be two-tiered: - emit warning when cycle count exceeds warning threshold (3) - fail if cycle count reaches the failure threshold (1000) Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb --- M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java 1 file changed, 355 insertions(+), 357 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/38/1038/1 diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 080c76f..e994ad1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -57,7 +57,8 @@ private static final int MIN_CLEANED_COUNT_DIFF = 3; private static final int PIN_MAX_WAIT_TIME = 50; - private static final int MAX_PIN_ATTEMPT_CYCLES = 3; + private static final int PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD = 3; + private static final int MAX_PIN_ATTEMPT_CYCLES = 1000; public static final boolean DEBUG = false; private final int pageSize; @@ -223,260 +224,231 @@ } private CachedPage findPage(long dpid) throws HyracksDataException { + return (CachedPage) getPageLoop(() -> findPageInner(dpid)); + } - for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) { - int startCleanedCount = cleanerThread.cleanedCount; - - CachedPage cPage = null; + private ICachedPage findPageInner(long dpid) { + CachedPage cPage; + /* + * Hash dpid to get a bucket and then check if the page exists in + * the bucket. + */ + int hash = hash(dpid); + CacheBucket bucket = pageMap[hash]; + bucket.bucketLock.lock(); + try { + cPage = bucket.cachedPage; + while (cPage != null) { + if (DEBUG) { + assert bucket.cachedPage != bucket.cachedPage.next; + } + if (cPage.dpid == dpid) { + if (DEBUG) { + assert !cPage.confiscated.get(); + } + cPage.pinCount.incrementAndGet(); + return cPage; + } + cPage = cPage.next; + } + } finally { + bucket.bucketLock.unlock(); + } + /* + * If we got here, the page was not in the hash table. Now we ask + * the page replacement strategy to find us a victim. + */ + CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim(); + if (victim == null) { + return null; + } + /* + * We have a victim with the following invariants. 1. The dpid + * on the CachedPage may or may not be valid. 2. We have a pin + * on the CachedPage. We have to deal with three cases here. + * Case 1: The dpid on the CachedPage is invalid (-1). This + * indicates that this buffer has never been used or is a + * confiscated page. So we are the only ones holding it. Get a lock + * on the required dpid's hash bucket, check if someone inserted + * the page we want into the table. If so, decrement the + * pincount on the victim and return the winner page in the + * table. If such a winner does not exist, insert the victim and + * return it. Case 2: The dpid on the CachedPage is valid. Case + * 2a: The current dpid and required dpid hash to the same + * bucket. Get the bucket lock, check that the victim is still + * at pinCount == 1 If so check if there is a winning CachedPage + * with the required dpid. If so, decrement the pinCount on the + * victim and return the winner. If not, update the contents of + * the CachedPage to hold the required dpid and return it. If + * the picCount on the victim was != 1 or CachedPage was dirty + * someone used the victim for its old contents -- Decrement the + * pinCount and retry. Case 2b: The current dpid and required + * dpid hash to different buckets. Get the two bucket locks in + * the order of the bucket indexes (Ordering prevents + * deadlocks). Check for the existence of a winner in the new + * bucket and for potential use of the victim (pinCount != 1). + * If everything looks good, remove the CachedPage from the old + * bucket, and add it to the new bucket and update its header + * with the new dpid. + */ + if (victim.dpid < 0) { /* - * Hash dpid to get a bucket and then check if the page exists in - * the bucket. + * Case 1. */ - int hash = hash(dpid); - CacheBucket bucket = pageMap[hash]; bucket.bucketLock.lock(); try { + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; + } + // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement + // pin count and try again + if (victim.dpid >= 0) { + victim.pinCount.decrementAndGet(); + return null; + } + if (DEBUG) { + confiscateLock.lock(); + try { + if (confiscatedPages.contains(victim)) { + throw new IllegalStateException(); + } + } finally { + confiscateLock.unlock(); + } + } cPage = bucket.cachedPage; while (cPage != null) { - if(DEBUG) { - assert bucket.cachedPage != bucket.cachedPage.next; - } if (cPage.dpid == dpid) { - if(DEBUG) { + cPage.pinCount.incrementAndGet(); + victim.pinCount.decrementAndGet(); + if (DEBUG) { assert !cPage.confiscated.get(); } - cPage.pinCount.incrementAndGet(); return cPage; } cPage = cPage.next; } + victim.reset(dpid); + victim.next = bucket.cachedPage; + bucket.cachedPage = victim; } finally { bucket.bucketLock.unlock(); } - /* - * If we got here, the page was not in the hash table. Now we ask - * the page replacement strategy to find us a victim. - */ - CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim(); - if (victim != null) { - /* - * We have a victim with the following invariants. 1. The dpid - * on the CachedPage may or may not be valid. 2. We have a pin - * on the CachedPage. We have to deal with three cases here. - * Case 1: The dpid on the CachedPage is invalid (-1). This - * indicates that this buffer has never been used or is a - * confiscated page. So we are the only ones holding it. Get a lock - * on the required dpid's hash bucket, check if someone inserted - * the page we want into the table. If so, decrement the - * pincount on the victim and return the winner page in the - * table. If such a winner does not exist, insert the victim and - * return it. Case 2: The dpid on the CachedPage is valid. Case - * 2a: The current dpid and required dpid hash to the same - * bucket. Get the bucket lock, check that the victim is still - * at pinCount == 1 If so check if there is a winning CachedPage - * with the required dpid. If so, decrement the pinCount on the - * victim and return the winner. If not, update the contents of - * the CachedPage to hold the required dpid and return it. If - * the picCount on the victim was != 1 or CachedPage was dirty - * someone used the victim for its old contents -- Decrement the - * pinCount and retry. Case 2b: The current dpid and required - * dpid hash to different buckets. Get the two bucket locks in - * the order of the bucket indexes (Ordering prevents - * deadlocks). Check for the existence of a winner in the new - * bucket and for potential use of the victim (pinCount != 1). - * If everything looks good, remove the CachedPage from the old - * bucket, and add it to the new bucket and update its header - * with the new dpid. - */ - if (victim.dpid < 0) { - /* - * Case 1. - */ - bucket.bucketLock.lock(); - try { - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement - // pin count and try again - if (victim.dpid >= 0) { - victim.pinCount.decrementAndGet(); - continue; - } - if (DEBUG) { - confiscateLock.lock(); - try{ - if (confiscatedPages.contains(victim)) { - throw new IllegalStateException(); - } - } finally{ - confiscateLock.unlock(); - } - } - cPage = bucket.cachedPage; - while (cPage != null) { - if (cPage.dpid == dpid) { - cPage.pinCount.incrementAndGet(); - victim.pinCount.decrementAndGet(); - if(DEBUG) { - assert !cPage.confiscated.get(); - } - return cPage; - } - cPage = cPage.next; - } - victim.reset(dpid); - victim.next = bucket.cachedPage; - bucket.cachedPage = victim; - } finally { - bucket.bucketLock.unlock(); - } - if(DEBUG) { - assert !victim.confiscated.get(); - } - return victim; - } - int victimHash = hash(victim.dpid); - if (victimHash == hash) { - /* - * Case 2a. - */ - bucket.bucketLock.lock(); - try { - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement - // pin count and try again - if (victimHash != hash(victim.dpid)) { - victim.pinCount.decrementAndGet(); - continue; - } - if (DEBUG) { - confiscateLock.lock(); - try{ - if (confiscatedPages.contains(victim)) { - throw new IllegalStateException(); - } - }finally{ - confiscateLock.unlock(); - } - } - cPage = bucket.cachedPage; - while (cPage != null) { - if (cPage.dpid == dpid) { - cPage.pinCount.incrementAndGet(); - victim.pinCount.decrementAndGet(); - if(DEBUG) { - assert !victim.confiscated.get(); - } - return cPage; - } - cPage = cPage.next; - } - victim.reset(dpid); - } finally { - bucket.bucketLock.unlock(); - } - if(DEBUG) { - assert !victim.confiscated.get(); - } - return victim; - } else { - /* - * Case 2b. - */ - CacheBucket victimBucket = pageMap[victimHash]; - if (victimHash < hash) { - victimBucket.bucketLock.lock(); - bucket.bucketLock.lock(); - } else { - bucket.bucketLock.lock(); - victimBucket.bucketLock.lock(); - } - try { - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement - // pin count and try again - if (victimHash != hash(victim.dpid)) { - victim.pinCount.decrementAndGet(); - continue; - } - if (DEBUG) { - if (confiscatedPages.contains(victim)) { - throw new IllegalStateException(); - } - } - cPage = bucket.cachedPage; - while (cPage != null) { - if (cPage.dpid == dpid) { - cPage.pinCount.incrementAndGet(); - victim.pinCount.decrementAndGet(); - if(DEBUG) { - assert !cPage.confiscated.get(); - } - return cPage; - } - cPage = cPage.next; - } - if (victimBucket.cachedPage == victim) { - victimBucket.cachedPage = victim.next; - } else { - CachedPage victimPrev = victimBucket.cachedPage; - while (victimPrev.next != victim) { - victimPrev = victimPrev.next; - if (victimPrev == null) { - throw new IllegalStateException(); - } - } - victimPrev.next = victim.next; - } - victim.reset(dpid); - victim.next = bucket.cachedPage; - bucket.cachedPage = victim; - } finally { - victimBucket.bucketLock.unlock(); - bucket.bucketLock.unlock(); - } - if(DEBUG) { - assert !victim.confiscated.get(); - } - return victim; - } + if (DEBUG) { + assert !victim.confiscated.get(); } - synchronized (cleanerThread.threadLock) { - try { - pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - // Heuristic optimization. Check whether the cleaner thread has - // cleaned pages since we did our last pin attempt. - if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) { - // Don't go to sleep and wait for notification from the cleaner, - // just try to pin again immediately. - continue; - } - synchronized (cleanerThread.cleanNotification) { - try { - // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify - do { - cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); - } while (false); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - finishQueue(); + return victim; } - throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + - " cycles (buffer cache undersized?)"); + int victimHash = hash(victim.dpid); + if (victimHash == hash) { + /* + * Case 2a. + */ + bucket.bucketLock.lock(); + try { + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; + } + // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement + // pin count and try again + if (victimHash != hash(victim.dpid)) { + victim.pinCount.decrementAndGet(); + return null; + } + if (DEBUG) { + confiscateLock.lock(); + try { + if (confiscatedPages.contains(victim)) { + throw new IllegalStateException(); + } + } finally { + confiscateLock.unlock(); + } + } + cPage = bucket.cachedPage; + while (cPage != null) { + if (cPage.dpid == dpid) { + cPage.pinCount.incrementAndGet(); + victim.pinCount.decrementAndGet(); + if (DEBUG) { + assert !victim.confiscated.get(); + } + return cPage; + } + cPage = cPage.next; + } + victim.reset(dpid); + } finally { + bucket.bucketLock.unlock(); + } + if (DEBUG) { + assert !victim.confiscated.get(); + } + return victim; + } else { + /* + * Case 2b. + */ + CacheBucket victimBucket = pageMap[victimHash]; + if (victimHash < hash) { + victimBucket.bucketLock.lock(); + bucket.bucketLock.lock(); + } else { + bucket.bucketLock.lock(); + victimBucket.bucketLock.lock(); + } + try { + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; + } + // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement + // pin count and try again + if (victimHash != hash(victim.dpid)) { + victim.pinCount.decrementAndGet(); + return null; + } + if (DEBUG) { + if (confiscatedPages.contains(victim)) { + throw new IllegalStateException(); + } + } + cPage = bucket.cachedPage; + while (cPage != null) { + if (cPage.dpid == dpid) { + cPage.pinCount.incrementAndGet(); + victim.pinCount.decrementAndGet(); + if (DEBUG) { + assert !cPage.confiscated.get(); + } + return cPage; + } + cPage = cPage.next; + } + if (victimBucket.cachedPage == victim) { + victimBucket.cachedPage = victim.next; + } else { + CachedPage victimPrev = victimBucket.cachedPage; + while (victimPrev.next != victim) { + victimPrev = victimPrev.next; + if (victimPrev == null) { + throw new IllegalStateException(); + } + } + victimPrev.next = victim.next; + } + victim.reset(dpid); + victim.next = bucket.cachedPage; + bucket.cachedPage = victim; + } finally { + victimBucket.bucketLock.unlock(); + bucket.bucketLock.unlock(); + } + if (DEBUG) { + assert !victim.confiscated.get(); + } + return victim; + } } private String dumpState() { @@ -1197,130 +1169,156 @@ private ICachedPage confiscatePage(long dpid, Supplier<ICachedPageInternal> victimSupplier) throws HyracksDataException { - for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) { - int startCleanedCount = cleanerThread.cleanedCount; - ICachedPage returnPage = null; - CachedPage victim = (CachedPage) victimSupplier.get(); - if (victim != null) { - if(DEBUG) { - assert !victim.confiscated.get(); - } - // find a page that would possibly be evicted anyway - // Case 1 from findPage() - if (victim.dpid < 0) { // new page - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement - // pin count and try again - if (victim.dpid >= 0) { - victim.pinCount.decrementAndGet(); - continue; - } - returnPage = victim; - ((CachedPage) returnPage).dpid = dpid; - } else { - // Case 2a/b - int pageHash = hash(victim.getDiskPageId()); - CacheBucket bucket = pageMap[pageHash]; - bucket.bucketLock.lock(); - try { - // readjust the next pointers to remove this page from - // the pagemap - CachedPage curr = bucket.cachedPage; - CachedPage prev = null; - boolean found = false; - //traverse the bucket's linked list to find the victim. - while (curr != null) { - if (curr == victim) { // we found where the victim - // resides in the hash table - if (!victim.pinCount.compareAndSet(0, 1)) { - break; - } - // if this is the first page in the bucket - if (prev == null) { - if(DEBUG) { - assert curr != curr.next; - } - bucket.cachedPage = bucket.cachedPage.next; - found = true; - break; - // if it isn't we need to make the previous - // node point to where it should - } else { - if(DEBUG) { - assert curr.next != curr; - } - prev.next = curr.next; - curr.next = null; - if(DEBUG) { - assert prev.next != prev; - } - found = true; - break; - } - } - // go to the next entry - prev = curr; - curr = curr.next; - } - if (found) { - returnPage = victim; - ((CachedPage) returnPage).dpid = dpid; - } //otherwise, someone took the same victim before we acquired the lock. try again! - } finally { - bucket.bucketLock.unlock(); - } - } - } - // if we found a page after all that, go ahead and finish - if (returnPage != null) { - ((CachedPage) returnPage).confiscated.set(true); - if (DEBUG) { - confiscateLock.lock(); - try{ - confiscatedPages.add((CachedPage) returnPage); - confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace()); - } - finally{ - confiscateLock.unlock(); - } - } - return returnPage; - } - // no page available to confiscate. try kicking the cleaner thread. - synchronized (cleanerThread.threadLock) { - try { - pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - // Heuristic optimization. Check whether the cleaner thread has - // cleaned pages since we did our last pin attempt. - if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) { - // Don't go to sleep and wait for notification from the cleaner, - // just try to pin again immediately. - continue; - } - synchronized (cleanerThread.cleanNotification) { - try { - // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify - do { - cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); - } while (false); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - finishQueue(); - } - throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + - " cycles (buffer cache undersized?)"); + return getPageLoop(() -> confiscateInner(dpid, victimSupplier)); } + private ICachedPage confiscateInner(long dpid, Supplier<ICachedPageInternal> victimSupplier) { + ICachedPage returnPage = null; + CachedPage victim = (CachedPage) victimSupplier.get(); + if (victim != null) { + if(DEBUG) { + assert !victim.confiscated.get(); + } + // find a page that would possibly be evicted anyway + // Case 1 from findPage() + if (victim.dpid < 0) { // new page + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; + } + // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement + // pin count and try again + if (victim.dpid >= 0) { + victim.pinCount.decrementAndGet(); + return null; + } + returnPage = victim; + ((CachedPage) returnPage).dpid = dpid; + } else { + // Case 2a/b + int pageHash = hash(victim.getDiskPageId()); + CacheBucket bucket = pageMap[pageHash]; + bucket.bucketLock.lock(); + try { + // readjust the next pointers to remove this page from + // the pagemap + CachedPage curr = bucket.cachedPage; + CachedPage prev = null; + boolean found = false; + //traverse the bucket's linked list to find the victim. + while (curr != null) { + if (curr == victim) { // we found where the victim + // resides in the hash table + if (!victim.pinCount.compareAndSet(0, 1)) { + break; + } + // if this is the first page in the bucket + if (prev == null) { + if(DEBUG) { + assert curr != curr.next; + } + bucket.cachedPage = bucket.cachedPage.next; + found = true; + break; + // if it isn't we need to make the previous + // node point to where it should + } else { + if(DEBUG) { + assert curr.next != curr; + } + prev.next = curr.next; + curr.next = null; + if(DEBUG) { + assert prev.next != prev; + } + found = true; + break; + } + } + // go to the next entry + prev = curr; + curr = curr.next; + } + if (found) { + returnPage = victim; + ((CachedPage) returnPage).dpid = dpid; + } //otherwise, someone took the same victim before we acquired the lock. try again! + } finally { + bucket.bucketLock.unlock(); + } + } + } + // if we found a page after all that, go ahead and finish + if (returnPage != null) { + ((CachedPage) returnPage).confiscated.set(true); + if (DEBUG) { + confiscateLock.lock(); + try{ + confiscatedPages.add((CachedPage) returnPage); + confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace()); + } + finally{ + confiscateLock.unlock(); + } + } + return returnPage; + } + return null; + } + + private ICachedPage getPageLoop(Supplier<ICachedPage> cachedPageSupplier) + throws HyracksDataException { + int cycleCount = 0; + try { + while (true) { + cycleCount++; + int startCleanedCount = cleanerThread.cleanedCount; + ICachedPage page = cachedPageSupplier.get(); + if (page != null) { + return page; + } + // no page available to confiscate. try kicking the cleaner thread. + synchronized (cleanerThread.threadLock) { + try { + pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); + } catch (InterruptedException e) { + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); + } + } + // Heuristic optimization. Check whether the cleaner thread has + // cleaned pages since we did our last pin attempt. + if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) { + // Don't go to sleep and wait for notification from the cleaner, + // just try to pin again immediately. + continue; + } + synchronized (cleanerThread.cleanNotification) { + try { + // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify + do { + cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); + } while (false); + } catch (InterruptedException e) { + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); + } + } + finishQueue(); + if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) { + cycleCount = 0; // suppress warning below + throw new HyracksDataException("Unable to find free page in buffer cache after " + + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)"); + } + } + } finally { + if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("Took " + cycleCount + " cycles to find free page in buffer cache. (buffer cache " + + "undersized?)"); + } + } + } + + @Override public void returnPage(ICachedPage page) { returnPage(page, true); -- To view, visit https://asterix-gerrit.ics.uci.edu/1038 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <mb...@apache.org>