abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2387
Change subject: [NO ISSUE][STO] Perform IO reads in uninterruptible threads
......................................................................
[NO ISSUE][STO] Perform IO reads in uninterruptible threads
Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77
---
M
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
M
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
M
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
3 files changed, 183 insertions(+), 44 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/87/2387/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
index f704921..00c5dce 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java
@@ -26,14 +26,20 @@
/**
* Represents an index cursor. The expected use
* cursor = new cursor();
- * while (more predicates){
- * -cursor.open(predicate);
- * -while (cursor.hasNext()){
- * --cursor.next()
+ * try{
+ * -while (more predicates){
+ * --cursor.open(predicate);
+ * --try{
+ * ---while (cursor.hasNext()){
+ * ----cursor.next()
+ * ---}
+ * --} finally{
+ * ---cursor.close();
+ * --}
* -}
- * -cursor.close();
+ * } finally{
+ * -cursor.destroy();
* }
- * cursor.destroy();
* Each created cursor must have destroy called
* Each successfully opened cursor must have close called
*
@@ -47,7 +53,8 @@
* When a cursor object is created, it is in the CLOSED state.
* CLOSED: Legal calls are open() --> OPENED, or destroy() --> DESTROYED,
close() --> no effect
* OPENED: The only legal calls are hasNext(), next(), or close() --> CLOSED.
- * DESTROYED: All calls are illegal.
+ * DESTROYED: The only legal call is destroy() which has no effect.
+ *
* Cursors must enforce the cursor state machine
*/
public interface IIndexCursor extends IDestroyable {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 1443bbc..4bc57fd 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -28,11 +28,14 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -45,8 +48,10 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.replication.IIOReplicationManager;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.common.buffercache.CachedPage.State;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.util.InvokeUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -55,6 +60,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAP_FACTOR = 3;
+ private static final CachedPage POISON_PILL = new CachedPage();
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
@@ -66,7 +72,8 @@
private final int pageSize;
private final int maxOpenFiles;
- final IIOManager ioManager;
+ private final ExecutorService executor;
+ private final IIOManager ioManager;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
private final IPageCleanerPolicy pageCleanerPolicy;
@@ -75,6 +82,7 @@
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private final AsyncFIFOPageQueueManager fifoWriter;
private final Queue<BufferCacheHeaderHelper> headerPageCache = new
ConcurrentLinkedQueue<>();
+ private final BlockingQueue<CachedPage> readRequests;
//DEBUG
private Level fileOpsLevel = Level.DEBUG;
@@ -103,19 +111,43 @@
this.pageReplacementStrategy = pageReplacementStrategy;
this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
-
- Executor executor = Executors.newCachedThreadPool(threadFactory);
- fileInfoMap = new HashMap<>();
- cleanerThread = new CleanerThread();
- executor.execute(cleanerThread);
- closed = false;
-
- fifoWriter = new AsyncFIFOPageQueueManager(this);
- if (DEBUG) {
- confiscatedPages = new ArrayList<>();
- confiscatedPagesOwner = new HashMap<>();
- confiscateLock = new ReentrantLock();
- pinnedPageOwner = new ConcurrentHashMap<>();
+ int numReaders = ioManager.getIODevices().size() * 2;
+ readRequests = new
ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages());
+ executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory);
+ try {
+ fileInfoMap = new HashMap<>();
+ cleanerThread = new CleanerThread();
+ executor.execute(cleanerThread);
+ for (int i = 0; i < numReaders; i++) {
+ executor.execute(new ReaderThread(i));
+ }
+ closed = false;
+ fifoWriter = new AsyncFIFOPageQueueManager(this);
+ if (DEBUG) {
+ confiscatedPages = new ArrayList<>();
+ confiscatedPagesOwner = new HashMap<>();
+ confiscateLock = new ReentrantLock();
+ pinnedPageOwner = new ConcurrentHashMap<>();
+ }
+ } catch (Throwable th) {
+ try {
+ throw th;
+ } finally {
+ readRequests.offer(POISON_PILL); // NOSONAR will always
succeed since the queue is empty
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.WARN, "Failure shutting down buffer
cache executor service");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Interrupted while shutting down
buffer cache executor service");
+ Thread.currentThread().interrupt();
+ th.addSuppressed(e);
+ } catch (Throwable e) {
+ LOGGER.log(Level.WARN, "Failure shutting down buffer cache
executor service", e);
+ th.addSuppressed(e);
+ }
+ }
}
}
@@ -185,22 +217,22 @@
// Resolve race of multiple threads trying to read the page from
// disk.
synchronized (cPage) {
- if (!cPage.valid) {
+ if (cPage.state != State.VALID) {
try {
- tryRead(cPage);
- cPage.valid = true;
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Failure while trying to read a
page from disk", e);
- throw e;
- } finally {
- if (!cPage.valid) {
- unpin(cPage);
+ if (cPage.state == State.INVALID) {
+ // submit request to read
+ cPage.state = State.READ_REQUESTED;
+ readRequests.put(cPage);
}
+ cPage.awaitRead();
+ } catch (Throwable th) {
+ unpin(cPage);
+ throw HyracksDataException.create(th);
}
}
}
} else {
- cPage.valid = true;
+ cPage.state = State.VALID;
}
pageReplacementStrategy.notifyCachePageAccess(cPage);
if (DEBUG) {
@@ -449,7 +481,7 @@
buffer.append(" ").append(cp.cpid).append(" -> [")
.append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
.append(BufferedFileHandle.getPageId(cp.dpid)).append(",
").append(cp.pinCount.get())
- .append(", ").append(cp.valid ? "valid" :
"invalid").append(", ")
+ .append(", ").append(cp.state).append(", ")
.append(cp.confiscated.get() ? "confiscated" :
"physical").append(", ")
.append(cp.dirty.get() ? "dirty" :
"clean").append("]\n");
cp = cp.next;
@@ -480,7 +512,7 @@
if (c.confiscated() || c.latch.getReadLockCount() != 0 ||
c.latch.getWriteHoldCount() != 0) {
return false;
}
- if (c.valid) {
+ if (c.state == State.VALID) {
reachableDpids.add(c.dpid);
}
}
@@ -670,6 +702,55 @@
}
}
+ private class ReaderThread implements Runnable {
+ private final int num;
+
+ private ReaderThread(int num) {
+ this.num = num;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ CachedPage next;
+ try {
+ next = readRequests.take();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN,
+ "Reader thread - " + num + " was interrupted.
Reader threads should never be interrupted");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ if (next == POISON_PILL) {
+ LOGGER.log(Level.INFO, "Exiting BufferCache reader thread
- " + num);
+ InvokeUtil.doUninterruptibly(() ->
readRequests.put(POISON_PILL));
+ if (Thread.currentThread().isInterrupted()) {
+ LOGGER.log(Level.ERROR, "Reader thread - " + num
+ + " was interrupted. Reader threads should
never be interrupted");
+ }
+ break;
+ }
+ if (next.state != State.READ_REQUESTED) {
+ LOGGER.log(Level.ERROR,
+ "Exiting BufferCache reader thread - " + num + ".
Took a page with state = " + next.state);
+ break;
+ }
+ try {
+ tryRead(next);
+ next.state = State.VALID;
+ } catch (HyracksDataException e) {
+ next.readFailure = e;
+ next.state = State.READ_FAILED;
+ LOGGER.log(Level.WARN, "Reader thread - " + num + " failed
to read a page", e);
+ }
+ synchronized (next) {
+ next.notifyAll();
+ }
+ }
+ }
+
+ }
+
private class CleanerThread implements Runnable {
private volatile boolean shutdownStart = false;
private volatile boolean shutdownComplete = false;
@@ -798,6 +879,17 @@
}
});
fileInfoMap.clear();
+ }
+ InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL));
+ // NOSONAR will always succeed since the queue is empty
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOGGER.log(Level.WARN, "Failure shutting down buffer cache
executor service");
+ }
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Interrupted while shutting down buffer
cache executor service");
+ Thread.currentThread().interrupt();
}
}
@@ -1307,17 +1399,21 @@
finishQueue();
if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) {
cycleCount = 0; // suppress warning below
- throw new HyracksDataException("Unable to find free page
in buffer cache after "
- + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache
undersized?)"
- + (DEBUG ? " ; " + (masterPinCount.get() -
startingPinCount)
- + " successful pins since start of cycle"
: ""));
+ throw new HyracksDataException(
+ "Unable to find free page in buffer cache after "
+ MAX_PIN_ATTEMPT_CYCLES
+ + " cycles (buffer cache undersized?)" +
(DEBUG
+ ? " ; " + (masterPinCount.get() -
startingPinCount)
+ + " successful pins since
start of cycle"
+ : ""));
}
}
} finally {
if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD &&
LOGGER.isWarnEnabled()) {
LOGGER.warn("Took " + cycleCount + " cycles to find free page
in buffer cache. (buffer cache "
- + "undersized?)" + (DEBUG ? " ; " +
(masterPinCount.get() - startingPinCount)
- + " successful pins since start of cycle" :
""));
+ + "undersized?)" + (DEBUG
+ ? " ; " + (masterPinCount.get() -
startingPinCount)
+ + " successful pins since start of
cycle"
+ : ""));
}
}
}
@@ -1343,7 +1439,7 @@
}
try {
cPage.reset(cPage.dpid);
- cPage.valid = true;
+ cPage.state = State.VALID;
cPage.next = bucket.cachedPage;
bucket.cachedPage = cPage;
cPage.pinCount.decrementAndGet();
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index bc0a04e..d7a55af 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,10 +23,19 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
/**
* @author yingyib
*/
public class CachedPage implements ICachedPageInternal {
+ public enum State {
+ INVALID,
+ READ_REQUESTED,
+ READ_FAILED,
+ VALID
+ }
+
final int cpid;
ByteBuffer buffer;
public final AtomicInteger pinCount;
@@ -36,7 +45,7 @@
private final IPageReplacementStrategy pageReplacementStrategy;
volatile long dpid; // disk page id (composed of file id and page id)
CachedPage next;
- volatile boolean valid;
+ volatile State state;
final AtomicBoolean confiscated;
private IQueueInfo queueInfo;
private int multiplier;
@@ -44,6 +53,7 @@
// DEBUG
private static final boolean DEBUG = false;
private final StackTraceElement[] ctorStack;
+ Throwable readFailure;
//Constructor for making dummy entry for FIFO queue
public CachedPage() {
@@ -72,7 +82,7 @@
latch = new ReentrantReadWriteLock(true);
replacementStrategyObject =
pageReplacementStrategy.createPerPageStrategyObject(cpid);
dpid = -1;
- valid = false;
+ state = State.INVALID;
confiscated = new AtomicBoolean(false);
queueInfo = null;
ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
@@ -81,7 +91,7 @@
public void reset(long dpid) {
this.dpid = dpid;
dirty.set(false);
- valid = false;
+ state = State.INVALID;
confiscated.set(false);
pageReplacementStrategy.notifyCachePageReset(this);
queueInfo = null;
@@ -205,4 +215,30 @@
public boolean isLargePage() {
return multiplier > 1;
}
+
+ /**
+ * Wait for the page requested to be read to complete the read operation
+ * This method is uninterrubtible
+ *
+ * @throws HyracksDataException
+ */
+ public synchronized void awaitRead() throws HyracksDataException {
+ boolean interrupted = false;
+ try {
+ while (state != State.VALID && state != State.READ_FAILED) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (state == State.READ_FAILED) {
+ throw HyracksDataException.create(readFailure);
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2387
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>