Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/894
Change subject: ASTERIXDB-1438: BufferCache spins indefinitely...
......................................................................
ASTERIXDB-1438: BufferCache spins indefinitely...
Fixes ASTERIXDB-1438: BufferCache spins indefinitely when cache is exceeded
After three unsuccessful cycles (each traversing clock pages three times)
and waiting for dirty pages to be cleaned, an exception is thrown
Change-Id: I327a7423bd630c96e16601b1a3a2a21f558f9f50
---
M
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
1 file changed, 51 insertions(+), 44 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/94/894/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 2b6dd73..4b7f3af 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -54,6 +54,7 @@
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
+ private static final int MAX_PIN_ATTEMPT_CYCLES = 3;
public static final boolean DEBUG = false;
private final int pageSize;
@@ -74,7 +75,7 @@
private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
//!DEBUG
private IIOReplicationManager ioReplicationManager;
- private List<ICachedPageInternal> cachedPages = new ArrayList<>();
+ private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
private boolean closed;
@@ -226,7 +227,8 @@
}
private CachedPage findPage(long dpid, boolean virtual) throws
HyracksDataException {
- while (true) {
+
+ for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
int startCleanedCount = cleanerThread.cleanedCount;
CachedPage cPage = null;
@@ -428,11 +430,11 @@
victimBucket.cachedPage = victim.next;
} else {
CachedPage victimPrev = victimBucket.cachedPage;
- while (victimPrev != null && victimPrev.next !=
victim) {
+ while (victimPrev.next != victim) {
victimPrev = victimPrev.next;
- }
- if(DEBUG) {
- assert victimPrev != null;
+ if (victimPrev == null) {
+ throw new IllegalStateException();
+ }
}
victimPrev.next = victim.next;
}
@@ -449,8 +451,8 @@
return victim;
}
}
- synchronized (cleanerThread) {
- pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
+ synchronized (cleanerThread.threadLock) {
+
pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
}
// Heuristic optimization. Check whether the cleaner thread has
// cleaned pages since we did our last pin attempt.
@@ -467,6 +469,8 @@
}
}
}
+ throw new HyracksDataException("Unable to find free page in buffer
cache after " + MAX_PIN_ATTEMPT_CYCLES +
+ " cycles (buffer cache undersized?)");
}
private String dumpState() {
@@ -652,15 +656,14 @@
}
private class CleanerThread extends Thread {
- private boolean shutdownStart = false;
- private boolean shutdownComplete = false;
+ private volatile boolean shutdownStart = false;
+ private volatile boolean shutdownComplete = false;
+ private final Object threadLock = new Object();
private final Object cleanNotification = new Object();
// Simply keeps incrementing this counter when a page is cleaned.
// Used to implement wait-for-cleanerthread heuristic optimizations.
// A waiter can detect whether pages have been cleaned.
- // No need to make this var volatile or synchronize it's access in any
- // way because it is used for heuristics.
- private int cleanedCount = 0;
+ private volatile int cleanedCount = 0;
public CleanerThread() {
setPriority(Thread.NORM_PRIORITY);
@@ -711,33 +714,35 @@
}
@Override
- public synchronized void run() {
- try {
- while (true) {
- pageCleanerPolicy.notifyCleanCycleStart(this);
- int curPage = 0;
+ public void run() {
+ synchronized (threadLock) {
+ try {
while (true) {
- synchronized (cachedPages) {
- if (curPage >= cachedPages.size()) {
- break;
+ pageCleanerPolicy.notifyCleanCycleStart(threadLock);
+ int curPage = 0;
+ while (true) {
+ synchronized (cachedPages) {
+ if (curPage >= cachedPages.size()) {
+ break;
+ }
+ CachedPage cPage = (CachedPage)
cachedPages.get(curPage);
+ if (cPage != null) {
+ cleanPage(cPage, false);
+ }
}
- CachedPage cPage = (CachedPage)
cachedPages.get(curPage);
- if (cPage != null) {
- cleanPage(cPage, false);
- }
+ curPage++;
}
- curPage++;
+ if (shutdownStart) {
+ break;
+ }
+ pageCleanerPolicy.notifyCleanCycleFinish(threadLock);
}
- if (shutdownStart) {
- break;
- }
- pageCleanerPolicy.notifyCleanCycleFinish(this);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ shutdownComplete = true;
+ threadLock.notifyAll();
}
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- shutdownComplete = true;
- notifyAll();
}
}
}
@@ -746,12 +751,12 @@
public void close() {
closed = true;
fifoWriter.destroyQueue();
- synchronized (cleanerThread) {
+ synchronized (cleanerThread.threadLock) {
cleanerThread.shutdownStart = true;
- cleanerThread.notifyAll();
+ cleanerThread.threadLock.notifyAll();
while (!cleanerThread.shutdownComplete) {
try {
- cleanerThread.wait();
+ cleanerThread.threadLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -882,7 +887,7 @@
private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage,
boolean flushDirtyPages)
throws HyracksDataException {
if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
- int pinCount = -1;
+ int pinCount;
if (cPage.dirty.get()) {
if (flushDirtyPages) {
write(cPage);
@@ -935,7 +940,7 @@
@Override
public void force(int fileId, boolean metadata) throws
HyracksDataException {
- BufferedFileHandle fInfo = null;
+ BufferedFileHandle fInfo;
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
}
@@ -1135,7 +1140,7 @@
@Override
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
- return confiscatePage(dpid, () ->
pageReplacementStrategy.findVictim());
+ return confiscatePage(dpid, pageReplacementStrategy::findVictim);
}
@Override
@@ -1145,7 +1150,7 @@
private ICachedPage confiscatePage(long dpid,
Supplier<ICachedPageInternal> victimSupplier)
throws HyracksDataException {
- while (true) {
+ for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
int startCleanedCount = cleanerThread.cleanedCount;
ICachedPage returnPage = null;
CachedPage victim = (CachedPage) victimSupplier.get();
@@ -1238,8 +1243,8 @@
return returnPage;
}
// no page available to confiscate. try kicking the cleaner thread.
- synchronized (cleanerThread) {
- pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
+ synchronized (cleanerThread.threadLock) {
+
pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
}
// Heuristic optimization. Check whether the cleaner thread has
// cleaned pages since we did our last pin attempt.
@@ -1256,6 +1261,8 @@
}
}
}
+ throw new HyracksDataException("Unable to find free page in buffer
cache after " + MAX_PIN_ATTEMPT_CYCLES +
+ " cycles (buffer cache undersized?)");
}
@Override
--
To view, visit https://asterix-gerrit.ics.uci.edu/894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I327a7423bd630c96e16601b1a3a2a21f558f9f50
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>