abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2387
Change subject: [NO ISSUE][STO] Perform IO reads in uninterruptible threads ...................................................................... [NO ISSUE][STO] Perform IO reads in uninterruptible threads Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77 --- M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java 3 files changed, 183 insertions(+), 44 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/87/2387/1 diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java index f704921..00c5dce 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java @@ -26,14 +26,20 @@ /** * Represents an index cursor. The expected use * cursor = new cursor(); - * while (more predicates){ - * -cursor.open(predicate); - * -while (cursor.hasNext()){ - * --cursor.next() + * try{ + * -while (more predicates){ + * --cursor.open(predicate); + * --try{ + * ---while (cursor.hasNext()){ + * ----cursor.next() + * ---} + * --} finally{ + * ---cursor.close(); + * --} * -} - * -cursor.close(); + * } finally{ + * -cursor.destroy(); * } - * cursor.destroy(); * Each created cursor must have destroy called * Each successfully opened cursor must have close called * @@ -47,7 +53,8 @@ * When a cursor object is created, it is in the CLOSED state. * CLOSED: Legal calls are open() --> OPENED, or destroy() --> DESTROYED, close() --> no effect * OPENED: The only legal calls are hasNext(), next(), or close() --> CLOSED. - * DESTROYED: All calls are illegal. + * DESTROYED: The only legal call is destroy() which has no effect. + * * Cursors must enforce the cursor state machine */ public interface IIndexCursor extends IDestroyable { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 1443bbc..4bc57fd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -28,11 +28,14 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,8 +48,10 @@ import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.replication.IIOReplicationManager; import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.storage.common.buffercache.CachedPage.State; import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.storage.common.file.IFileMapManager; +import org.apache.hyracks.util.InvokeUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -55,6 +60,7 @@ private static final Logger LOGGER = LogManager.getLogger(); private static final int MAP_FACTOR = 3; + private static final CachedPage POISON_PILL = new CachedPage(); private static final int MIN_CLEANED_COUNT_DIFF = 3; private static final int PIN_MAX_WAIT_TIME = 50; @@ -66,7 +72,8 @@ private final int pageSize; private final int maxOpenFiles; - final IIOManager ioManager; + private final ExecutorService executor; + private final IIOManager ioManager; private final CacheBucket[] pageMap; private final IPageReplacementStrategy pageReplacementStrategy; private final IPageCleanerPolicy pageCleanerPolicy; @@ -75,6 +82,7 @@ private final Map<Integer, BufferedFileHandle> fileInfoMap; private final AsyncFIFOPageQueueManager fifoWriter; private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>(); + private final BlockingQueue<CachedPage> readRequests; //DEBUG private Level fileOpsLevel = Level.DEBUG; @@ -103,19 +111,43 @@ this.pageReplacementStrategy = pageReplacementStrategy; this.pageCleanerPolicy = pageCleanerPolicy; this.fileMapManager = fileMapManager; - - Executor executor = Executors.newCachedThreadPool(threadFactory); - fileInfoMap = new HashMap<>(); - cleanerThread = new CleanerThread(); - executor.execute(cleanerThread); - closed = false; - - fifoWriter = new AsyncFIFOPageQueueManager(this); - if (DEBUG) { - confiscatedPages = new ArrayList<>(); - confiscatedPagesOwner = new HashMap<>(); - confiscateLock = new ReentrantLock(); - pinnedPageOwner = new ConcurrentHashMap<>(); + int numReaders = ioManager.getIODevices().size() * 2; + readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages()); + executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory); + try { + fileInfoMap = new HashMap<>(); + cleanerThread = new CleanerThread(); + executor.execute(cleanerThread); + for (int i = 0; i < numReaders; i++) { + executor.execute(new ReaderThread(i)); + } + closed = false; + fifoWriter = new AsyncFIFOPageQueueManager(this); + if (DEBUG) { + confiscatedPages = new ArrayList<>(); + confiscatedPagesOwner = new HashMap<>(); + confiscateLock = new ReentrantLock(); + pinnedPageOwner = new ConcurrentHashMap<>(); + } + } catch (Throwable th) { + try { + throw th; + } finally { + readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the queue is empty + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service"); + } + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service"); + Thread.currentThread().interrupt(); + th.addSuppressed(e); + } catch (Throwable e) { + LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service", e); + th.addSuppressed(e); + } + } } } @@ -185,22 +217,22 @@ // Resolve race of multiple threads trying to read the page from // disk. synchronized (cPage) { - if (!cPage.valid) { + if (cPage.state != State.VALID) { try { - tryRead(cPage); - cPage.valid = true; - } catch (Exception e) { - LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e); - throw e; - } finally { - if (!cPage.valid) { - unpin(cPage); + if (cPage.state == State.INVALID) { + // submit request to read + cPage.state = State.READ_REQUESTED; + readRequests.put(cPage); } + cPage.awaitRead(); + } catch (Throwable th) { + unpin(cPage); + throw HyracksDataException.create(th); } } } } else { - cPage.valid = true; + cPage.state = State.VALID; } pageReplacementStrategy.notifyCachePageAccess(cPage); if (DEBUG) { @@ -449,7 +481,7 @@ buffer.append(" ").append(cp.cpid).append(" -> [") .append(BufferedFileHandle.getFileId(cp.dpid)).append(':') .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get()) - .append(", ").append(cp.valid ? "valid" : "invalid").append(", ") + .append(", ").append(cp.state).append(", ") .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ") .append(cp.dirty.get() ? "dirty" : "clean").append("]\n"); cp = cp.next; @@ -480,7 +512,7 @@ if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) { return false; } - if (c.valid) { + if (c.state == State.VALID) { reachableDpids.add(c.dpid); } } @@ -670,6 +702,55 @@ } } + private class ReaderThread implements Runnable { + private final int num; + + private ReaderThread(int num) { + this.num = num; + } + + @Override + public void run() { + while (true) { + CachedPage next; + try { + next = readRequests.take(); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, + "Reader thread - " + num + " was interrupted. Reader threads should never be interrupted"); + Thread.currentThread().interrupt(); + break; + } + if (next == POISON_PILL) { + LOGGER.log(Level.INFO, "Exiting BufferCache reader thread - " + num); + InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL)); + if (Thread.currentThread().isInterrupted()) { + LOGGER.log(Level.ERROR, "Reader thread - " + num + + " was interrupted. Reader threads should never be interrupted"); + } + break; + } + if (next.state != State.READ_REQUESTED) { + LOGGER.log(Level.ERROR, + "Exiting BufferCache reader thread - " + num + ". Took a page with state = " + next.state); + break; + } + try { + tryRead(next); + next.state = State.VALID; + } catch (HyracksDataException e) { + next.readFailure = e; + next.state = State.READ_FAILED; + LOGGER.log(Level.WARN, "Reader thread - " + num + " failed to read a page", e); + } + synchronized (next) { + next.notifyAll(); + } + } + } + + } + private class CleanerThread implements Runnable { private volatile boolean shutdownStart = false; private volatile boolean shutdownComplete = false; @@ -798,6 +879,17 @@ } }); fileInfoMap.clear(); + } + InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL)); + // NOSONAR will always succeed since the queue is empty + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service"); + } + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service"); + Thread.currentThread().interrupt(); } } @@ -1307,17 +1399,21 @@ finishQueue(); if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) { cycleCount = 0; // suppress warning below - throw new HyracksDataException("Unable to find free page in buffer cache after " - + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)" - + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount) - + " successful pins since start of cycle" : "")); + throw new HyracksDataException( + "Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + + " cycles (buffer cache undersized?)" + (DEBUG + ? " ; " + (masterPinCount.get() - startingPinCount) + + " successful pins since start of cycle" + : "")); } } } finally { if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isWarnEnabled()) { LOGGER.warn("Took " + cycleCount + " cycles to find free page in buffer cache. (buffer cache " - + "undersized?)" + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount) - + " successful pins since start of cycle" : "")); + + "undersized?)" + (DEBUG + ? " ; " + (masterPinCount.get() - startingPinCount) + + " successful pins since start of cycle" + : "")); } } } @@ -1343,7 +1439,7 @@ } try { cPage.reset(cPage.dpid); - cPage.valid = true; + cPage.state = State.VALID; cPage.next = bucket.cachedPage; bucket.cachedPage = cPage; cPage.pinCount.decrementAndGet(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java index bc0a04e..d7a55af 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java @@ -23,10 +23,19 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hyracks.api.exceptions.HyracksDataException; + /** * @author yingyib */ public class CachedPage implements ICachedPageInternal { + public enum State { + INVALID, + READ_REQUESTED, + READ_FAILED, + VALID + } + final int cpid; ByteBuffer buffer; public final AtomicInteger pinCount; @@ -36,7 +45,7 @@ private final IPageReplacementStrategy pageReplacementStrategy; volatile long dpid; // disk page id (composed of file id and page id) CachedPage next; - volatile boolean valid; + volatile State state; final AtomicBoolean confiscated; private IQueueInfo queueInfo; private int multiplier; @@ -44,6 +53,7 @@ // DEBUG private static final boolean DEBUG = false; private final StackTraceElement[] ctorStack; + Throwable readFailure; //Constructor for making dummy entry for FIFO queue public CachedPage() { @@ -72,7 +82,7 @@ latch = new ReentrantReadWriteLock(true); replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid); dpid = -1; - valid = false; + state = State.INVALID; confiscated = new AtomicBoolean(false); queueInfo = null; ctorStack = DEBUG ? new Throwable().getStackTrace() : null; @@ -81,7 +91,7 @@ public void reset(long dpid) { this.dpid = dpid; dirty.set(false); - valid = false; + state = State.INVALID; confiscated.set(false); pageReplacementStrategy.notifyCachePageReset(this); queueInfo = null; @@ -205,4 +215,30 @@ public boolean isLargePage() { return multiplier > 1; } + + /** + * Wait for the page requested to be read to complete the read operation + * This method is uninterrubtible + * + * @throws HyracksDataException + */ + public synchronized void awaitRead() throws HyracksDataException { + boolean interrupted = false; + try { + while (state != State.VALID && state != State.READ_FAILED) { + try { + wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + if (state == State.READ_FAILED) { + throw HyracksDataException.create(readFailure); + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2387 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>