Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1038

Change subject: ASTERIXDB-1516: Unable to find free page in buffer cache after 
3 cycles
......................................................................

ASTERIXDB-1516: Unable to find free page in buffer cache after 3 cycles

Update exhaustion logic to be two-tiered:
- emit warning when cycle count exceeds warning threshold (3)
- fail if cycle count reaches the failure threshold (1000)

Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb
---
M 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
1 file changed, 355 insertions(+), 357 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/38/1038/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 080c76f..e994ad1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -57,7 +57,8 @@
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
-    private static final int MAX_PIN_ATTEMPT_CYCLES = 3;
+    private static final int PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD = 3;
+    private static final int MAX_PIN_ATTEMPT_CYCLES = 1000;
     public static final boolean DEBUG = false;
 
     private final int pageSize;
@@ -223,260 +224,231 @@
     }
 
     private CachedPage findPage(long dpid) throws HyracksDataException {
+        return (CachedPage) getPageLoop(() -> findPageInner(dpid));
+    }
 
-        for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
-            int startCleanedCount = cleanerThread.cleanedCount;
-
-            CachedPage cPage = null;
+    private ICachedPage findPageInner(long dpid) {
+        CachedPage cPage;
+        /*
+         * Hash dpid to get a bucket and then check if the page exists in
+         * the bucket.
+         */
+        int hash = hash(dpid);
+        CacheBucket bucket = pageMap[hash];
+        bucket.bucketLock.lock();
+        try {
+            cPage = bucket.cachedPage;
+            while (cPage != null) {
+                if (DEBUG) {
+                    assert bucket.cachedPage != bucket.cachedPage.next;
+                }
+                if (cPage.dpid == dpid) {
+                    if (DEBUG) {
+                        assert !cPage.confiscated.get();
+                    }
+                    cPage.pinCount.incrementAndGet();
+                    return cPage;
+                }
+                cPage = cPage.next;
+            }
+        } finally {
+            bucket.bucketLock.unlock();
+        }
+        /*
+         * If we got here, the page was not in the hash table. Now we ask
+         * the page replacement strategy to find us a victim.
+         */
+        CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
+        if (victim == null) {
+            return null;
+        }
+        /*
+         * We have a victim with the following invariants. 1. The dpid
+         * on the CachedPage may or may not be valid. 2. We have a pin
+         * on the CachedPage. We have to deal with three cases here.
+         * Case 1: The dpid on the CachedPage is invalid (-1). This
+         * indicates that this buffer has never been used or is a
+         * confiscated page. So we are the only ones holding it. Get a lock
+         * on the required dpid's hash bucket, check if someone inserted
+         * the page we want into the table. If so, decrement the
+         * pincount on the victim and return the winner page in the
+         * table. If such a winner does not exist, insert the victim and
+         * return it. Case 2: The dpid on the CachedPage is valid. Case
+         * 2a: The current dpid and required dpid hash to the same
+         * bucket. Get the bucket lock, check that the victim is still
+         * at pinCount == 1 If so check if there is a winning CachedPage
+         * with the required dpid. If so, decrement the pinCount on the
+         * victim and return the winner. If not, update the contents of
+         * the CachedPage to hold the required dpid and return it. If
+         * the picCount on the victim was != 1 or CachedPage was dirty
+         * someone used the victim for its old contents -- Decrement the
+         * pinCount and retry. Case 2b: The current dpid and required
+         * dpid hash to different buckets. Get the two bucket locks in
+         * the order of the bucket indexes (Ordering prevents
+         * deadlocks). Check for the existence of a winner in the new
+         * bucket and for potential use of the victim (pinCount != 1).
+         * If everything looks good, remove the CachedPage from the old
+         * bucket, and add it to the new bucket and update its header
+         * with the new dpid.
+         */
+        if (victim.dpid < 0) {
             /*
-             * Hash dpid to get a bucket and then check if the page exists in
-             * the bucket.
+             * Case 1.
              */
-            int hash = hash(dpid);
-            CacheBucket bucket = pageMap[hash];
             bucket.bucketLock.lock();
             try {
+                if (!victim.pinCount.compareAndSet(0, 1)) {
+                    return null;
+                }
+                // now that we have the pin, ensure the victim's dpid still is 
< 0, if it's not, decrement
+                // pin count and try again
+                if (victim.dpid >= 0) {
+                    victim.pinCount.decrementAndGet();
+                    return null;
+                }
+                if (DEBUG) {
+                    confiscateLock.lock();
+                    try {
+                        if (confiscatedPages.contains(victim)) {
+                            throw new IllegalStateException();
+                        }
+                    } finally {
+                        confiscateLock.unlock();
+                    }
+                }
                 cPage = bucket.cachedPage;
                 while (cPage != null) {
-                    if(DEBUG) {
-                        assert bucket.cachedPage != bucket.cachedPage.next;
-                    }
                     if (cPage.dpid == dpid) {
-                        if(DEBUG) {
+                        cPage.pinCount.incrementAndGet();
+                        victim.pinCount.decrementAndGet();
+                        if (DEBUG) {
                             assert !cPage.confiscated.get();
                         }
-                        cPage.pinCount.incrementAndGet();
                         return cPage;
                     }
                     cPage = cPage.next;
                 }
+                victim.reset(dpid);
+                victim.next = bucket.cachedPage;
+                bucket.cachedPage = victim;
             } finally {
                 bucket.bucketLock.unlock();
             }
-            /*
-             * If we got here, the page was not in the hash table. Now we ask
-             * the page replacement strategy to find us a victim.
-             */
-            CachedPage victim = (CachedPage) 
pageReplacementStrategy.findVictim();
-            if (victim != null) {
-                /*
-                 * We have a victim with the following invariants. 1. The dpid
-                 * on the CachedPage may or may not be valid. 2. We have a pin
-                 * on the CachedPage. We have to deal with three cases here.
-                 * Case 1: The dpid on the CachedPage is invalid (-1). This
-                 * indicates that this buffer has never been used or is a
-                 * confiscated page. So we are the only ones holding it. Get a 
lock
-                 * on the required dpid's hash bucket, check if someone 
inserted
-                 * the page we want into the table. If so, decrement the
-                 * pincount on the victim and return the winner page in the
-                 * table. If such a winner does not exist, insert the victim 
and
-                 * return it. Case 2: The dpid on the CachedPage is valid. Case
-                 * 2a: The current dpid and required dpid hash to the same
-                 * bucket. Get the bucket lock, check that the victim is still
-                 * at pinCount == 1 If so check if there is a winning 
CachedPage
-                 * with the required dpid. If so, decrement the pinCount on the
-                 * victim and return the winner. If not, update the contents of
-                 * the CachedPage to hold the required dpid and return it. If
-                 * the picCount on the victim was != 1 or CachedPage was dirty
-                 * someone used the victim for its old contents -- Decrement 
the
-                 * pinCount and retry. Case 2b: The current dpid and required
-                 * dpid hash to different buckets. Get the two bucket locks in
-                 * the order of the bucket indexes (Ordering prevents
-                 * deadlocks). Check for the existence of a winner in the new
-                 * bucket and for potential use of the victim (pinCount != 1).
-                 * If everything looks good, remove the CachedPage from the old
-                 * bucket, and add it to the new bucket and update its header
-                 * with the new dpid.
-                 */
-                if (victim.dpid < 0) {
-                    /*
-                     * Case 1.
-                     */
-                    bucket.bucketLock.lock();
-                    try {
-                        if (!victim.pinCount.compareAndSet(0, 1)) {
-                            continue;
-                        }
-                        // now that we have the pin, ensure the victim's dpid 
still is < 0, if it's not, decrement
-                        // pin count and try again
-                        if (victim.dpid >= 0) {
-                            victim.pinCount.decrementAndGet();
-                            continue;
-                        }
-                        if (DEBUG) {
-                            confiscateLock.lock();
-                            try{
-                                if (confiscatedPages.contains(victim)) {
-                                    throw new IllegalStateException();
-                                }
-                            } finally{
-                                confiscateLock.unlock();
-                            }
-                        }
-                        cPage = bucket.cachedPage;
-                        while (cPage != null) {
-                            if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
-                                if(DEBUG) {
-                                    assert !cPage.confiscated.get();
-                                }
-                                return cPage;
-                            }
-                            cPage = cPage.next;
-                        }
-                        victim.reset(dpid);
-                        victim.next = bucket.cachedPage;
-                        bucket.cachedPage = victim;
-                    } finally {
-                        bucket.bucketLock.unlock();
-                    }
 
-                    if(DEBUG) {
-                        assert !victim.confiscated.get();
-                    }
-                    return victim;
-                }
-                int victimHash = hash(victim.dpid);
-                if (victimHash == hash) {
-                    /*
-                     * Case 2a.
-                     */
-                    bucket.bucketLock.lock();
-                    try {
-                        if (!victim.pinCount.compareAndSet(0, 1)) {
-                            continue;
-                        }
-                        // now that we have the pin, ensure the victim's 
bucket hasn't changed, if it has, decrement
-                        // pin count and try again
-                        if (victimHash != hash(victim.dpid)) {
-                            victim.pinCount.decrementAndGet();
-                            continue;
-                        }
-                        if (DEBUG) {
-                            confiscateLock.lock();
-                            try{
-                                if (confiscatedPages.contains(victim)) {
-                                    throw new IllegalStateException();
-                                }
-                            }finally{
-                                confiscateLock.unlock();
-                            }
-                        }
-                        cPage = bucket.cachedPage;
-                        while (cPage != null) {
-                            if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
-                                if(DEBUG) {
-                                    assert !victim.confiscated.get();
-                                }
-                                return cPage;
-                            }
-                            cPage = cPage.next;
-                        }
-                        victim.reset(dpid);
-                    } finally {
-                        bucket.bucketLock.unlock();
-                    }
-                    if(DEBUG) {
-                        assert !victim.confiscated.get();
-                    }
-                    return victim;
-                } else {
-                    /*
-                     * Case 2b.
-                     */
-                    CacheBucket victimBucket = pageMap[victimHash];
-                    if (victimHash < hash) {
-                        victimBucket.bucketLock.lock();
-                        bucket.bucketLock.lock();
-                    } else {
-                        bucket.bucketLock.lock();
-                        victimBucket.bucketLock.lock();
-                    }
-                    try {
-                        if (!victim.pinCount.compareAndSet(0, 1)) {
-                            continue;
-                        }
-                        // now that we have the pin, ensure the victim's 
bucket hasn't changed, if it has, decrement
-                        // pin count and try again
-                        if (victimHash != hash(victim.dpid)) {
-                            victim.pinCount.decrementAndGet();
-                            continue;
-                        }
-                        if (DEBUG) {
-                            if (confiscatedPages.contains(victim)) {
-                                throw new IllegalStateException();
-                            }
-                        }
-                        cPage = bucket.cachedPage;
-                        while (cPage != null) {
-                            if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
-                                if(DEBUG) {
-                                    assert !cPage.confiscated.get();
-                                }
-                                return cPage;
-                            }
-                            cPage = cPage.next;
-                        }
-                        if (victimBucket.cachedPage == victim) {
-                            victimBucket.cachedPage = victim.next;
-                        } else {
-                            CachedPage victimPrev = victimBucket.cachedPage;
-                            while (victimPrev.next != victim) {
-                                victimPrev = victimPrev.next;
-                                if (victimPrev == null) {
-                                    throw new IllegalStateException();
-                                }
-                            }
-                            victimPrev.next = victim.next;
-                        }
-                        victim.reset(dpid);
-                        victim.next = bucket.cachedPage;
-                        bucket.cachedPage = victim;
-                    } finally {
-                        victimBucket.bucketLock.unlock();
-                        bucket.bucketLock.unlock();
-                    }
-                    if(DEBUG) {
-                        assert !victim.confiscated.get();
-                    }
-                    return victim;
-                }
+            if (DEBUG) {
+                assert !victim.confiscated.get();
             }
-            synchronized (cleanerThread.threadLock) {
-                try {
-                    
pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            // Heuristic optimization. Check whether the cleaner thread has
-            // cleaned pages since we did our last pin attempt.
-            if (cleanerThread.cleanedCount - startCleanedCount > 
MIN_CLEANED_COUNT_DIFF) {
-                // Don't go to sleep and wait for notification from the 
cleaner,
-                // just try to pin again immediately.
-                continue;
-            }
-            synchronized (cleanerThread.cleanNotification) {
-                try {
-                    // it's OK to not loop on this wait, as we do not rely on 
any condition to be true on notify
-                    do {
-                        
cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
-                    } while (false);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            finishQueue();
+            return victim;
         }
-        throw new HyracksDataException("Unable to find free page in buffer 
cache after " + MAX_PIN_ATTEMPT_CYCLES +
-                " cycles (buffer cache undersized?)");
+        int victimHash = hash(victim.dpid);
+        if (victimHash == hash) {
+        /*
+         * Case 2a.
+         */
+            bucket.bucketLock.lock();
+            try {
+                if (!victim.pinCount.compareAndSet(0, 1)) {
+                    return null;
+                }
+                // now that we have the pin, ensure the victim's bucket hasn't 
changed, if it has, decrement
+                // pin count and try again
+                if (victimHash != hash(victim.dpid)) {
+                    victim.pinCount.decrementAndGet();
+                    return null;
+                }
+                if (DEBUG) {
+                    confiscateLock.lock();
+                    try {
+                        if (confiscatedPages.contains(victim)) {
+                            throw new IllegalStateException();
+                        }
+                    } finally {
+                        confiscateLock.unlock();
+                    }
+                }
+                cPage = bucket.cachedPage;
+                while (cPage != null) {
+                    if (cPage.dpid == dpid) {
+                        cPage.pinCount.incrementAndGet();
+                        victim.pinCount.decrementAndGet();
+                        if (DEBUG) {
+                            assert !victim.confiscated.get();
+                        }
+                        return cPage;
+                    }
+                    cPage = cPage.next;
+                }
+                victim.reset(dpid);
+            } finally {
+                bucket.bucketLock.unlock();
+            }
+            if (DEBUG) {
+                assert !victim.confiscated.get();
+            }
+            return victim;
+        } else {
+        /*
+         * Case 2b.
+         */
+            CacheBucket victimBucket = pageMap[victimHash];
+            if (victimHash < hash) {
+                victimBucket.bucketLock.lock();
+                bucket.bucketLock.lock();
+            } else {
+                bucket.bucketLock.lock();
+                victimBucket.bucketLock.lock();
+            }
+            try {
+                if (!victim.pinCount.compareAndSet(0, 1)) {
+                    return null;
+                }
+                // now that we have the pin, ensure the victim's bucket hasn't 
changed, if it has, decrement
+                // pin count and try again
+                if (victimHash != hash(victim.dpid)) {
+                    victim.pinCount.decrementAndGet();
+                    return null;
+                }
+                if (DEBUG) {
+                    if (confiscatedPages.contains(victim)) {
+                        throw new IllegalStateException();
+                    }
+                }
+                cPage = bucket.cachedPage;
+                while (cPage != null) {
+                    if (cPage.dpid == dpid) {
+                        cPage.pinCount.incrementAndGet();
+                        victim.pinCount.decrementAndGet();
+                        if (DEBUG) {
+                            assert !cPage.confiscated.get();
+                        }
+                        return cPage;
+                    }
+                    cPage = cPage.next;
+                }
+                if (victimBucket.cachedPage == victim) {
+                    victimBucket.cachedPage = victim.next;
+                } else {
+                    CachedPage victimPrev = victimBucket.cachedPage;
+                    while (victimPrev.next != victim) {
+                        victimPrev = victimPrev.next;
+                        if (victimPrev == null) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                    victimPrev.next = victim.next;
+                }
+                victim.reset(dpid);
+                victim.next = bucket.cachedPage;
+                bucket.cachedPage = victim;
+            } finally {
+                victimBucket.bucketLock.unlock();
+                bucket.bucketLock.unlock();
+            }
+            if (DEBUG) {
+                assert !victim.confiscated.get();
+            }
+            return victim;
+        }
     }
 
     private String dumpState() {
@@ -1197,130 +1169,156 @@
 
     private ICachedPage confiscatePage(long dpid, 
Supplier<ICachedPageInternal> victimSupplier)
             throws HyracksDataException {
-        for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
-            int startCleanedCount = cleanerThread.cleanedCount;
-            ICachedPage returnPage = null;
-            CachedPage victim = (CachedPage) victimSupplier.get();
-            if (victim != null) {
-                if(DEBUG) {
-                    assert !victim.confiscated.get();
-                }
-                // find a page that would possibly be evicted anyway
-                // Case 1 from findPage()
-                if (victim.dpid < 0) { // new page
-                    if (!victim.pinCount.compareAndSet(0, 1)) {
-                        continue;
-                    }
-                    // now that we have the pin, ensure the victim's dpid 
still is < 0, if it's not, decrement
-                    // pin count and try again
-                    if (victim.dpid >= 0) {
-                        victim.pinCount.decrementAndGet();
-                        continue;
-                    }
-                    returnPage = victim;
-                    ((CachedPage) returnPage).dpid = dpid;
-                } else {
-                    // Case 2a/b
-                    int pageHash = hash(victim.getDiskPageId());
-                    CacheBucket bucket = pageMap[pageHash];
-                    bucket.bucketLock.lock();
-                    try {
-                        // readjust the next pointers to remove this page from
-                        // the pagemap
-                        CachedPage curr = bucket.cachedPage;
-                        CachedPage prev = null;
-                        boolean found = false;
-                        //traverse the bucket's linked list to find the victim.
-                        while (curr != null) {
-                            if (curr == victim) { // we found where the victim
-                                                  // resides in the hash table
-                                if (!victim.pinCount.compareAndSet(0, 1)) {
-                                    break;
-                                }
-                                // if this is the first page in the bucket
-                                if (prev == null) {
-                                    if(DEBUG) {
-                                        assert curr != curr.next;
-                                    }
-                                    bucket.cachedPage = bucket.cachedPage.next;
-                                    found = true;
-                                    break;
-                                    // if it isn't we need to make the previous
-                                    // node point to where it should
-                                } else {
-                                    if(DEBUG) {
-                                        assert curr.next != curr;
-                                    }
-                                    prev.next = curr.next;
-                                    curr.next = null;
-                                    if(DEBUG) {
-                                        assert prev.next != prev;
-                                    }
-                                    found = true;
-                                    break;
-                                }
-                            }
-                            // go to the next entry
-                            prev = curr;
-                            curr = curr.next;
-                        }
-                        if (found) {
-                            returnPage = victim;
-                            ((CachedPage) returnPage).dpid = dpid;
-                        } //otherwise, someone took the same victim before we 
acquired the lock. try again!
-                    } finally {
-                        bucket.bucketLock.unlock();
-                    }
-                }
-            }
-            // if we found a page after all that, go ahead and finish
-            if (returnPage != null) {
-                ((CachedPage) returnPage).confiscated.set(true);
-                if (DEBUG) {
-                    confiscateLock.lock();
-                    try{
-                        confiscatedPages.add((CachedPage) returnPage);
-                        confiscatedPagesOwner.put((CachedPage) returnPage, 
Thread.currentThread().getStackTrace());
-                    }
-                    finally{
-                        confiscateLock.unlock();
-                    }
-                }
-                return returnPage;
-            }
-            // no page available to confiscate. try kicking the cleaner thread.
-            synchronized (cleanerThread.threadLock) {
-                try {
-                    
pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            // Heuristic optimization. Check whether the cleaner thread has
-            // cleaned pages since we did our last pin attempt.
-            if (cleanerThread.cleanedCount - startCleanedCount > 
MIN_CLEANED_COUNT_DIFF) {
-                // Don't go to sleep and wait for notification from the 
cleaner,
-                // just try to pin again immediately.
-                continue;
-            }
-            synchronized (cleanerThread.cleanNotification) {
-                try {
-                    // it's OK to not loop on this wait, as we do not rely on 
any condition to be true on notify
-                    do {
-                        
cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
-                    } while (false);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            finishQueue();
-        }
-        throw new HyracksDataException("Unable to find free page in buffer 
cache after " + MAX_PIN_ATTEMPT_CYCLES +
-                " cycles (buffer cache undersized?)");
+        return getPageLoop(() -> confiscateInner(dpid, victimSupplier));
     }
 
+    private ICachedPage confiscateInner(long dpid, 
Supplier<ICachedPageInternal> victimSupplier) {
+        ICachedPage returnPage = null;
+        CachedPage victim = (CachedPage) victimSupplier.get();
+        if (victim != null) {
+            if(DEBUG) {
+                assert !victim.confiscated.get();
+            }
+            // find a page that would possibly be evicted anyway
+            // Case 1 from findPage()
+            if (victim.dpid < 0) { // new page
+                if (!victim.pinCount.compareAndSet(0, 1)) {
+                    return null;
+                }
+                // now that we have the pin, ensure the victim's dpid still is 
< 0, if it's not, decrement
+                // pin count and try again
+                if (victim.dpid >= 0) {
+                    victim.pinCount.decrementAndGet();
+                    return null;
+                }
+                returnPage = victim;
+                ((CachedPage) returnPage).dpid = dpid;
+            } else {
+                // Case 2a/b
+                int pageHash = hash(victim.getDiskPageId());
+                CacheBucket bucket = pageMap[pageHash];
+                bucket.bucketLock.lock();
+                try {
+                    // readjust the next pointers to remove this page from
+                    // the pagemap
+                    CachedPage curr = bucket.cachedPage;
+                    CachedPage prev = null;
+                    boolean found = false;
+                    //traverse the bucket's linked list to find the victim.
+                    while (curr != null) {
+                        if (curr == victim) { // we found where the victim
+                            // resides in the hash table
+                            if (!victim.pinCount.compareAndSet(0, 1)) {
+                                break;
+                            }
+                            // if this is the first page in the bucket
+                            if (prev == null) {
+                                if(DEBUG) {
+                                    assert curr != curr.next;
+                                }
+                                bucket.cachedPage = bucket.cachedPage.next;
+                                found = true;
+                                break;
+                                // if it isn't we need to make the previous
+                                // node point to where it should
+                            } else {
+                                if(DEBUG) {
+                                    assert curr.next != curr;
+                                }
+                                prev.next = curr.next;
+                                curr.next = null;
+                                if(DEBUG) {
+                                    assert prev.next != prev;
+                                }
+                                found = true;
+                                break;
+                            }
+                        }
+                        // go to the next entry
+                        prev = curr;
+                        curr = curr.next;
+                    }
+                    if (found) {
+                        returnPage = victim;
+                        ((CachedPage) returnPage).dpid = dpid;
+                    } //otherwise, someone took the same victim before we 
acquired the lock. try again!
+                } finally {
+                    bucket.bucketLock.unlock();
+                }
+            }
+        }
+        // if we found a page after all that, go ahead and finish
+        if (returnPage != null) {
+            ((CachedPage) returnPage).confiscated.set(true);
+            if (DEBUG) {
+                confiscateLock.lock();
+                try{
+                    confiscatedPages.add((CachedPage) returnPage);
+                    confiscatedPagesOwner.put((CachedPage) returnPage, 
Thread.currentThread().getStackTrace());
+                }
+                finally{
+                    confiscateLock.unlock();
+                }
+            }
+            return returnPage;
+        }
+        return null;
+    }
+
+    private ICachedPage getPageLoop(Supplier<ICachedPage> cachedPageSupplier)
+            throws HyracksDataException {
+        int cycleCount = 0;
+        try {
+            while (true) {
+                cycleCount++;
+                int startCleanedCount = cleanerThread.cleanedCount;
+                ICachedPage page = cachedPageSupplier.get();
+                if (page != null) {
+                    return page;
+                }
+                // no page available to confiscate. try kicking the cleaner 
thread.
+                synchronized (cleanerThread.threadLock) {
+                    try {
+                        
pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
+                    } catch (InterruptedException e) {
+                        // Re-interrupt the thread so this gets handled later
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                // Heuristic optimization. Check whether the cleaner thread has
+                // cleaned pages since we did our last pin attempt.
+                if (cleanerThread.cleanedCount - startCleanedCount > 
MIN_CLEANED_COUNT_DIFF) {
+                    // Don't go to sleep and wait for notification from the 
cleaner,
+                    // just try to pin again immediately.
+                    continue;
+                }
+                synchronized (cleanerThread.cleanNotification) {
+                    try {
+                        // it's OK to not loop on this wait, as we do not rely 
on any condition to be true on notify
+                        do {
+                            
cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+                        } while (false);
+                    } catch (InterruptedException e) {
+                        // Re-interrupt the thread so this gets handled later
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                finishQueue();
+                if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) {
+                    cycleCount = 0; // suppress warning below
+                    throw new HyracksDataException("Unable to find free page 
in buffer cache after "
+                            + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache 
undersized?)");
+                }
+            }
+        } finally {
+            if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && 
LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Took " + cycleCount + " cycles to find free 
page in buffer cache.  (buffer cache " +
+                        "undersized?)");
+            }
+        }
+    }
+
+
     @Override
     public void returnPage(ICachedPage page) {
         returnPage(page, true);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1038
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mb...@apache.org>

Reply via email to