Michael Blow has submitted this change and it was merged. Change subject: ASTERIXDB-1516: Unable to find free page in buffer cache after 3 cycles ......................................................................
ASTERIXDB-1516: Unable to find free page in buffer cache after 3 cycles Update exhaustion logic to be two-tiered: - emit warning when cycle count exceeds warning threshold (3) - fail if cycle count reaches the failure threshold (1000) Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb Reviewed-on: https://asterix-gerrit.ics.uci.edu/1038 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <hubail...@gmail.com> --- M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java 1 file changed, 392 insertions(+), 417 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 080c76f..1b75f94 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -37,7 +37,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -57,7 +56,8 @@ private static final int MIN_CLEANED_COUNT_DIFF = 3; private static final int PIN_MAX_WAIT_TIME = 50; - private static final int MAX_PIN_ATTEMPT_CYCLES = 3; + private static final int PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD = 3; + private static final int MAX_PIN_ATTEMPT_CYCLES = 1000; public static final boolean DEBUG = false; private final int pageSize; @@ -107,7 +107,7 @@ closed = false; fifoWriter = new AsyncFIFOPageQueueManager(this); - if( DEBUG ) { + if ( DEBUG ) { confiscatedPages = new ArrayList<>(); confiscatedPagesOwner = new HashMap<>(); confiscateLock = new ReentrantLock(); @@ -200,7 +200,7 @@ throw new IllegalStateException(); } } - } finally{ + } finally { confiscateLock.unlock(); } } @@ -216,267 +216,227 @@ cPage.valid = true; } pageReplacementStrategy.notifyCachePageAccess(cPage); - if(DEBUG){ + if (DEBUG){ pinnedPageOwner.put(cPage, Thread.currentThread().getStackTrace()); } return cPage; } private CachedPage findPage(long dpid) throws HyracksDataException { + return (CachedPage) getPageLoop(dpid, -1, false); + } - for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) { - int startCleanedCount = cleanerThread.cleanedCount; - - CachedPage cPage = null; + private ICachedPage findPageInner(long dpid) { + CachedPage cPage; + /* + * Hash dpid to get a bucket and then check if the page exists in + * the bucket. + */ + int hash = hash(dpid); + CacheBucket bucket = pageMap[hash]; + bucket.bucketLock.lock(); + try { + cPage = bucket.cachedPage; + while (cPage != null) { + if (DEBUG) { + assert bucket.cachedPage != bucket.cachedPage.next; + } + if (cPage.dpid == dpid) { + if (DEBUG) { + assert !cPage.confiscated.get(); + } + cPage.pinCount.incrementAndGet(); + return cPage; + } + cPage = cPage.next; + } + } finally { + bucket.bucketLock.unlock(); + } + /* + * If we got here, the page was not in the hash table. Now we ask + * the page replacement strategy to find us a victim. + */ + CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim(); + if (victim == null) { + return null; + } + /* + * We have a victim with the following invariants. 1. The dpid + * on the CachedPage may or may not be valid. 2. We have a pin + * on the CachedPage. We have to deal with three cases here. + * Case 1: The dpid on the CachedPage is invalid (-1). This + * indicates that this buffer has never been used or is a + * confiscated page. So we are the only ones holding it. Get a lock + * on the required dpid's hash bucket, check if someone inserted + * the page we want into the table. If so, decrement the + * pincount on the victim and return the winner page in the + * table. If such a winner does not exist, insert the victim and + * return it. Case 2: The dpid on the CachedPage is valid. Case + * 2a: The current dpid and required dpid hash to the same + * bucket. Get the bucket lock, check that the victim is still + * at pinCount == 1 If so check if there is a winning CachedPage + * with the required dpid. If so, decrement the pinCount on the + * victim and return the winner. If not, update the contents of + * the CachedPage to hold the required dpid and return it. If + * the picCount on the victim was != 1 or CachedPage was dirty + * someone used the victim for its old contents -- Decrement the + * pinCount and retry. Case 2b: The current dpid and required + * dpid hash to different buckets. Get the two bucket locks in + * the order of the bucket indexes (Ordering prevents + * deadlocks). Check for the existence of a winner in the new + * bucket and for potential use of the victim (pinCount != 1). + * If everything looks good, remove the CachedPage from the old + * bucket, and add it to the new bucket and update its header + * with the new dpid. + */ + if (victim.dpid < 0) { /* - * Hash dpid to get a bucket and then check if the page exists in - * the bucket. + * Case 1. */ - int hash = hash(dpid); - CacheBucket bucket = pageMap[hash]; bucket.bucketLock.lock(); try { - cPage = bucket.cachedPage; - while (cPage != null) { - if(DEBUG) { - assert bucket.cachedPage != bucket.cachedPage.next; - } - if (cPage.dpid == dpid) { - if(DEBUG) { - assert !cPage.confiscated.get(); - } - cPage.pinCount.incrementAndGet(); - return cPage; - } - cPage = cPage.next; + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; } + // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement + // pin count and try again + if (victim.dpid >= 0) { + victim.pinCount.decrementAndGet(); + return null; + } + if (DEBUG) { + confiscateLock.lock(); + try { + if (confiscatedPages.contains(victim)) { + throw new IllegalStateException(); + } + } finally { + confiscateLock.unlock(); + } + } + cPage = findTargetInBucket(dpid, bucket.cachedPage, victim); + if (cPage != null) { + return cPage; + } + victim.reset(dpid); + victim.next = bucket.cachedPage; + bucket.cachedPage = victim; } finally { bucket.bucketLock.unlock(); } - /* - * If we got here, the page was not in the hash table. Now we ask - * the page replacement strategy to find us a victim. - */ - CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim(); - if (victim != null) { - /* - * We have a victim with the following invariants. 1. The dpid - * on the CachedPage may or may not be valid. 2. We have a pin - * on the CachedPage. We have to deal with three cases here. - * Case 1: The dpid on the CachedPage is invalid (-1). This - * indicates that this buffer has never been used or is a - * confiscated page. So we are the only ones holding it. Get a lock - * on the required dpid's hash bucket, check if someone inserted - * the page we want into the table. If so, decrement the - * pincount on the victim and return the winner page in the - * table. If such a winner does not exist, insert the victim and - * return it. Case 2: The dpid on the CachedPage is valid. Case - * 2a: The current dpid and required dpid hash to the same - * bucket. Get the bucket lock, check that the victim is still - * at pinCount == 1 If so check if there is a winning CachedPage - * with the required dpid. If so, decrement the pinCount on the - * victim and return the winner. If not, update the contents of - * the CachedPage to hold the required dpid and return it. If - * the picCount on the victim was != 1 or CachedPage was dirty - * someone used the victim for its old contents -- Decrement the - * pinCount and retry. Case 2b: The current dpid and required - * dpid hash to different buckets. Get the two bucket locks in - * the order of the bucket indexes (Ordering prevents - * deadlocks). Check for the existence of a winner in the new - * bucket and for potential use of the victim (pinCount != 1). - * If everything looks good, remove the CachedPage from the old - * bucket, and add it to the new bucket and update its header - * with the new dpid. - */ - if (victim.dpid < 0) { - /* - * Case 1. - */ - bucket.bucketLock.lock(); - try { - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement - // pin count and try again - if (victim.dpid >= 0) { - victim.pinCount.decrementAndGet(); - continue; - } - if (DEBUG) { - confiscateLock.lock(); - try{ - if (confiscatedPages.contains(victim)) { - throw new IllegalStateException(); - } - } finally{ - confiscateLock.unlock(); - } - } - cPage = bucket.cachedPage; - while (cPage != null) { - if (cPage.dpid == dpid) { - cPage.pinCount.incrementAndGet(); - victim.pinCount.decrementAndGet(); - if(DEBUG) { - assert !cPage.confiscated.get(); - } - return cPage; - } - cPage = cPage.next; - } - victim.reset(dpid); - victim.next = bucket.cachedPage; - bucket.cachedPage = victim; - } finally { - bucket.bucketLock.unlock(); - } - if(DEBUG) { - assert !victim.confiscated.get(); - } - return victim; - } - int victimHash = hash(victim.dpid); - if (victimHash == hash) { - /* - * Case 2a. - */ - bucket.bucketLock.lock(); - try { - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement - // pin count and try again - if (victimHash != hash(victim.dpid)) { - victim.pinCount.decrementAndGet(); - continue; - } - if (DEBUG) { - confiscateLock.lock(); - try{ - if (confiscatedPages.contains(victim)) { - throw new IllegalStateException(); - } - }finally{ - confiscateLock.unlock(); - } - } - cPage = bucket.cachedPage; - while (cPage != null) { - if (cPage.dpid == dpid) { - cPage.pinCount.incrementAndGet(); - victim.pinCount.decrementAndGet(); - if(DEBUG) { - assert !victim.confiscated.get(); - } - return cPage; - } - cPage = cPage.next; - } - victim.reset(dpid); - } finally { - bucket.bucketLock.unlock(); - } - if(DEBUG) { - assert !victim.confiscated.get(); - } - return victim; - } else { - /* - * Case 2b. - */ - CacheBucket victimBucket = pageMap[victimHash]; - if (victimHash < hash) { - victimBucket.bucketLock.lock(); - bucket.bucketLock.lock(); - } else { - bucket.bucketLock.lock(); - victimBucket.bucketLock.lock(); - } - try { - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement - // pin count and try again - if (victimHash != hash(victim.dpid)) { - victim.pinCount.decrementAndGet(); - continue; - } - if (DEBUG) { - if (confiscatedPages.contains(victim)) { - throw new IllegalStateException(); - } - } - cPage = bucket.cachedPage; - while (cPage != null) { - if (cPage.dpid == dpid) { - cPage.pinCount.incrementAndGet(); - victim.pinCount.decrementAndGet(); - if(DEBUG) { - assert !cPage.confiscated.get(); - } - return cPage; - } - cPage = cPage.next; - } - if (victimBucket.cachedPage == victim) { - victimBucket.cachedPage = victim.next; - } else { - CachedPage victimPrev = victimBucket.cachedPage; - while (victimPrev.next != victim) { - victimPrev = victimPrev.next; - if (victimPrev == null) { - throw new IllegalStateException(); - } - } - victimPrev.next = victim.next; - } - victim.reset(dpid); - victim.next = bucket.cachedPage; - bucket.cachedPage = victim; - } finally { - victimBucket.bucketLock.unlock(); - bucket.bucketLock.unlock(); - } - if(DEBUG) { - assert !victim.confiscated.get(); - } - return victim; - } + if (DEBUG) { + assert !victim.confiscated.get(); } - synchronized (cleanerThread.threadLock) { - try { - pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - // Heuristic optimization. Check whether the cleaner thread has - // cleaned pages since we did our last pin attempt. - if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) { - // Don't go to sleep and wait for notification from the cleaner, - // just try to pin again immediately. - continue; - } - synchronized (cleanerThread.cleanNotification) { - try { - // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify - do { - cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); - } while (false); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - finishQueue(); + return victim; } - throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + - " cycles (buffer cache undersized?)"); + int victimHash = hash(victim.dpid); + if (victimHash == hash) { + /* + * Case 2a. + */ + bucket.bucketLock.lock(); + try { + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; + } + // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement + // pin count and try again + if (victimHash != hash(victim.dpid)) { + victim.pinCount.decrementAndGet(); + return null; + } + if (DEBUG) { + confiscateLock.lock(); + try { + if (confiscatedPages.contains(victim)) { + throw new IllegalStateException(); + } + } finally { + confiscateLock.unlock(); + } + } + cPage = findTargetInBucket(dpid, bucket.cachedPage, victim); + if (cPage != null) { + return cPage; + } + victim.reset(dpid); + } finally { + bucket.bucketLock.unlock(); + } + if (DEBUG) { + assert !victim.confiscated.get(); + } + return victim; + } else { + /* + * Case 2b. + */ + CacheBucket victimBucket = pageMap[victimHash]; + if (victimHash < hash) { + victimBucket.bucketLock.lock(); + bucket.bucketLock.lock(); + } else { + bucket.bucketLock.lock(); + victimBucket.bucketLock.lock(); + } + try { + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; + } + // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement + // pin count and try again + if (victimHash != hash(victim.dpid)) { + victim.pinCount.decrementAndGet(); + return null; + } + if (DEBUG && confiscatedPages.contains(victim)) { + throw new IllegalStateException(); + } + cPage = findTargetInBucket(dpid, bucket.cachedPage, victim); + if (cPage != null) { + return cPage; + } + if (victimBucket.cachedPage == victim) { + victimBucket.cachedPage = victim.next; + } else { + CachedPage victimPrev = victimBucket.cachedPage; + while (victimPrev.next != victim) { + victimPrev = victimPrev.next; + if (victimPrev == null) { + throw new IllegalStateException(); + } + } + victimPrev.next = victim.next; + } + victim.reset(dpid); + victim.next = bucket.cachedPage; + bucket.cachedPage = victim; + } finally { + victimBucket.bucketLock.unlock(); + bucket.bucketLock.unlock(); + } + if (DEBUG) { + assert !victim.confiscated.get(); + } + return victim; + } + } + + private CachedPage findTargetInBucket(long dpid, CachedPage cPage, CachedPage victim) { + while (cPage != null) { + if (cPage.dpid == dpid) { + cPage.pinCount.incrementAndGet(); + victim.pinCount.decrementAndGet(); + if (DEBUG) { + assert !cPage.confiscated.get(); + } + break; + } + cPage = cPage.next; + } + return cPage; } private String dumpState() { @@ -494,30 +454,30 @@ cb.bucketLock.lock(); try { CachedPage cp = cb.cachedPage; - if (cp != null) { - buffer.append(" ").append(i).append('\n'); - while (cp != null) { - buffer.append(" ").append(cp.cpid).append(" -> [") - .append(BufferedFileHandle.getFileId(cp.dpid)).append(':') - .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get()) - .append(", ").append(cp.valid ? "valid" : "invalid").append(", ") - .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ") - .append(cp.dirty.get() ? "dirty" : "clean").append("]\n"); - cp = cp.next; - ++nCachedPages; - } + if (cp == null) { + continue; + } + buffer.append(" ").append(i).append('\n'); + while (cp != null) { + buffer.append(" ").append(cp.cpid).append(" -> [") + .append(BufferedFileHandle.getFileId(cp.dpid)).append(':') + .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get()) + .append(", ").append(cp.valid ? "valid" : "invalid").append(", ") + .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ") + .append(cp.dirty.get() ? "dirty" : "clean").append("]\n"); + cp = cp.next; + ++nCachedPages; } } finally { cb.bucketLock.unlock(); } } buffer.append("Number of cached pages: ").append(nCachedPages).append('\n'); - if(DEBUG){ + if (DEBUG){ confiscateLock.lock(); - try{ + try { buffer.append("Number of confiscated pages: ").append(confiscatedPages.size()).append('\n'); - } - finally{ + } finally { confiscateLock.unlock(); } } @@ -534,13 +494,13 @@ c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) { return false; } - if(c.valid){ + if (c.valid){ reachableDpids.add(c.dpid); } } } for(Long l: reachableDpids){ - if(!canFindValidCachedPage(l)){ + if (!canFindValidCachedPage(l)){ return false; } } @@ -918,12 +878,10 @@ } } // Take care of the head of the chain. - if (bucket.cachedPage != null) { - if (invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) { - CachedPage cPage = bucket.cachedPage; - bucket.cachedPage = bucket.cachedPage.next; - cPage.next = null; - } + if (bucket.cachedPage != null && invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) { + CachedPage cPage = bucket.cachedPage; + bucket.cachedPage = bucket.cachedPage.next; + cPage.next = null; } } finally { bucket.bucketLock.unlock(); @@ -1115,30 +1073,26 @@ boolean found = false; //traverse the bucket's linked list to find the victim. while (curr != null) { - if (curr == victim) { // we found where the victim + if (curr == victim) { + // we found where the victim // resides in the hash table - // if this is the first page in the bucket + if (DEBUG) { + assert curr != curr.next; + } if (prev == null) { - if(DEBUG) { - assert curr != curr.next; - } - bucket.cachedPage = bucket.cachedPage.next; - found = true; - break; + // if this is the first page in the bucket + bucket.cachedPage = curr.next; + } else { // if it isn't we need to make the previous // node point to where it should - } else { - if(DEBUG) { - assert curr.next != curr; - } prev.next = curr.next; - curr.next = null; - if(DEBUG) { + if (DEBUG) { assert prev.next != prev; } - found = true; - break; } + curr.next = null; + found = true; + break; } // go to the next entry prev = curr; @@ -1170,7 +1124,7 @@ if (fInfo == null) { throw new HyracksDataException("No such file mapped for fileId:" + fileId); } - if(DEBUG) { + if (DEBUG) { assert ioManager.getSize(fInfo.getFileHandle()) % getPageSizeWithHeader() == 0; } return (int) (ioManager.getSize(fInfo.getFileHandle()) / getPageSizeWithHeader()); @@ -1184,141 +1138,162 @@ @Override public ICachedPage confiscatePage(long dpid) throws HyracksDataException { - return confiscatePage(dpid, pageReplacementStrategy::findVictim); + return confiscatePage(dpid, 1); } @Override public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) throws HyracksDataException { - ICachedPage cachedPage = confiscatePage(dpid, () -> pageReplacementStrategy.findVictim(multiplier)); + ICachedPage cachedPage = confiscatePage(dpid, multiplier); ((ICachedPageInternal)cachedPage).setExtraBlockPageId(extraBlockPageId); return cachedPage; } - private ICachedPage confiscatePage(long dpid, Supplier<ICachedPageInternal> victimSupplier) + private ICachedPage confiscatePage(long dpid, int multiplier) throws HyracksDataException { - for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) { - int startCleanedCount = cleanerThread.cleanedCount; - ICachedPage returnPage = null; - CachedPage victim = (CachedPage) victimSupplier.get(); - if (victim != null) { - if(DEBUG) { - assert !victim.confiscated.get(); + return getPageLoop(dpid, multiplier, true); + } + + private ICachedPage confiscateInner(long dpid, int multiplier) { + ICachedPage returnPage = null; + CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim(multiplier); + if (victim == null) { + return victim; + } + if (DEBUG) { + assert !victim.confiscated.get(); + } + // find a page that would possibly be evicted anyway + // Case 1 from findPage() + if (victim.dpid < 0) { // new page + if (!victim.pinCount.compareAndSet(0, 1)) { + return null; + } + // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement + // pin count and try again + if (victim.dpid >= 0) { + victim.pinCount.decrementAndGet(); + return null; + } + returnPage = victim; + ((CachedPage) returnPage).dpid = dpid; + } else { + // Case 2a/b + int pageHash = hash(victim.getDiskPageId()); + CacheBucket bucket = pageMap[pageHash]; + bucket.bucketLock.lock(); + try { + // readjust the next pointers to remove this page from + // the pagemap + CachedPage curr = bucket.cachedPage; + CachedPage prev = null; + boolean found = false; + //traverse the bucket's linked list to find the victim. + while (curr != null) { + if (curr == victim) { // we found where the victim + // resides in the hash table + if (!victim.pinCount.compareAndSet(0, 1)) { + break; + } + if (DEBUG) { + assert curr != curr.next; + } + if (prev == null) { + // if this is the first page in the bucket + bucket.cachedPage = curr.next; + // if it isn't we need to make the previous + // node point to where it should + } else { + prev.next = curr.next; + if (DEBUG) { + assert prev.next != prev; + } + } + curr.next = null; + found = true; + break; + } + // go to the next entry + prev = curr; + curr = curr.next; } - // find a page that would possibly be evicted anyway - // Case 1 from findPage() - if (victim.dpid < 0) { // new page - if (!victim.pinCount.compareAndSet(0, 1)) { - continue; - } - // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement - // pin count and try again - if (victim.dpid >= 0) { - victim.pinCount.decrementAndGet(); - continue; - } + if (found) { returnPage = victim; ((CachedPage) returnPage).dpid = dpid; - } else { - // Case 2a/b - int pageHash = hash(victim.getDiskPageId()); - CacheBucket bucket = pageMap[pageHash]; - bucket.bucketLock.lock(); - try { - // readjust the next pointers to remove this page from - // the pagemap - CachedPage curr = bucket.cachedPage; - CachedPage prev = null; - boolean found = false; - //traverse the bucket's linked list to find the victim. - while (curr != null) { - if (curr == victim) { // we found where the victim - // resides in the hash table - if (!victim.pinCount.compareAndSet(0, 1)) { - break; - } - // if this is the first page in the bucket - if (prev == null) { - if(DEBUG) { - assert curr != curr.next; - } - bucket.cachedPage = bucket.cachedPage.next; - found = true; - break; - // if it isn't we need to make the previous - // node point to where it should - } else { - if(DEBUG) { - assert curr.next != curr; - } - prev.next = curr.next; - curr.next = null; - if(DEBUG) { - assert prev.next != prev; - } - found = true; - break; - } - } - // go to the next entry - prev = curr; - curr = curr.next; - } - if (found) { - returnPage = victim; - ((CachedPage) returnPage).dpid = dpid; - } //otherwise, someone took the same victim before we acquired the lock. try again! - } finally { - bucket.bucketLock.unlock(); - } - } + } //otherwise, someone took the same victim before we acquired the lock. try again! + } finally { + bucket.bucketLock.unlock(); } - // if we found a page after all that, go ahead and finish - if (returnPage != null) { - ((CachedPage) returnPage).confiscated.set(true); - if (DEBUG) { - confiscateLock.lock(); - try{ - confiscatedPages.add((CachedPage) returnPage); - confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace()); - } - finally{ - confiscateLock.unlock(); - } - } - return returnPage; - } - // no page available to confiscate. try kicking the cleaner thread. - synchronized (cleanerThread.threadLock) { - try { - pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - // Heuristic optimization. Check whether the cleaner thread has - // cleaned pages since we did our last pin attempt. - if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) { - // Don't go to sleep and wait for notification from the cleaner, - // just try to pin again immediately. - continue; - } - synchronized (cleanerThread.cleanNotification) { - try { - // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify - do { - cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); - } while (false); - } catch (InterruptedException e) { - // Re-interrupt the thread so this gets handled later - Thread.currentThread().interrupt(); - } - } - finishQueue(); } - throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + - " cycles (buffer cache undersized?)"); + // if we found a page after all that, go ahead and finish + if (returnPage != null) { + ((CachedPage) returnPage).confiscated.set(true); + if (DEBUG) { + confiscateLock.lock(); + try { + confiscatedPages.add((CachedPage) returnPage); + confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace()); + } finally { + confiscateLock.unlock(); + } + } + return returnPage; + } + return null; + } + + private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate) + throws HyracksDataException { + int cycleCount = 0; + try { + while (true) { + cycleCount++; + int startCleanedCount = cleanerThread.cleanedCount; + ICachedPage page = confiscate ? confiscateInner(dpid, multiplier) : findPageInner(dpid); + if (page != null) { + return page; + } + // no page available to confiscate. try kicking the cleaner thread. + synchronized (cleanerThread.threadLock) { + try { + pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock); + } catch (InterruptedException e) { + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); + } + } + // Heuristic optimization. Check whether the cleaner thread has + // cleaned pages since we did our last pin attempt. + if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) { + // Don't go to sleep and wait for notification from the cleaner, + // just try to pin again immediately. + continue; + } + synchronized (cleanerThread.cleanNotification) { + try { + // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify + // This seemingly pointless loop keeps SonarQube happy + do { + cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME); + } while (false); + } catch (InterruptedException e) { + // Re-interrupt the thread so this gets handled later + Thread.currentThread().interrupt(); + } + } + finishQueue(); + if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) { + cycleCount = 0; // suppress warning below + throw new HyracksDataException("Unable to find free page in buffer cache after " + + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)"); + } + } + } finally { + if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("Took " + cycleCount + " cycles to find free page in buffer cache. (buffer cache " + + "undersized?)"); + } + } } @Override @@ -1329,15 +1304,15 @@ @Override public void returnPage(ICachedPage page, boolean reinsert) { CachedPage cPage = (CachedPage) page; - CacheBucket bucket = null; - if(!page.confiscated()){ + CacheBucket bucket; + if (!page.confiscated()){ return; } if (reinsert) { int hash = hash(cPage.dpid); bucket = pageMap[hash]; bucket.bucketLock.lock(); - if(DEBUG) { + if (DEBUG) { confiscateLock.lock(); } try { @@ -1346,7 +1321,7 @@ cPage.next = bucket.cachedPage; bucket.cachedPage = cPage; cPage.pinCount.decrementAndGet(); - if(DEBUG){ + if (DEBUG){ assert cPage.pinCount.get() == 0 ; assert cPage.latch.getReadLockCount() == 0; assert cPage.latch.getWriteHoldCount() == 0; @@ -1355,22 +1330,22 @@ } } finally { bucket.bucketLock.unlock(); - if(DEBUG) { + if (DEBUG) { confiscateLock.unlock(); } } } else { cPage.invalidate(); cPage.pinCount.decrementAndGet(); - if(DEBUG){ + if (DEBUG){ assert cPage.pinCount.get() == 0; assert cPage.latch.getReadLockCount() == 0; assert cPage.latch.getWriteHoldCount() == 0; confiscateLock.lock(); - try{ + try { confiscatedPages.remove(cPage); confiscatedPagesOwner.remove(cPage); - } finally{ + } finally { confiscateLock.unlock(); } } @@ -1421,7 +1396,7 @@ public void purgeHandle(int fileId) throws HyracksDataException{ synchronized(fileInfoMap){ BufferedFileHandle fh = fileInfoMap.get(fileId); - if(fh != null){ + if (fh != null){ ioManager.close(fh.getFileHandle()); fileInfoMap.remove(fileId); fileMapManager.unregisterFile(fileId); -- To view, visit https://asterix-gerrit.ics.uci.edu/1038 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Ian Maxon <ima...@apache.org> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <hubail...@gmail.com> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com>