abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][STO] Perform IO reads in uninterruptible threads ......................................................................
[NO ISSUE][STO] Perform IO reads in uninterruptible threads - user model changes: no - storage format changes: no - interface changes: no details: - Previously, IO reads are performed on the task threads. - Task threads can be interrupted which can cause deadlocks due to a JDK synchronization bug. - After this change, 2 IO read threads are there per IO device. - Threads will receive IO read requests and process them. - Such threads are never interrupted and are killed through the use of a poison pill. Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2387 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java 4 files changed, 188 insertions(+), 46 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; ; Verified Michael Blow: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml index 9936e1a..40422f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml @@ -16,18 +16,16 @@ ! specific language governing permissions and limitations ! under the License. !--> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>hyracks-storage-common</artifactId> <name>hyracks-storage-common</name> - <parent> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks</artifactId> <version>0.3.4-SNAPSHOT</version> </parent> - <licenses> <license> <name>Apache License, Version 2.0</name> @@ -36,7 +34,6 @@ <comments>A business-friendly OSS license</comments> </license> </licenses> - <properties> <root.dir>${basedir}/../..</root.dir> </properties> @@ -52,8 +49,13 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> </dependencies> -</project> +</project> \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java index f704921..00c5dce 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexCursor.java @@ -26,14 +26,20 @@ /** * Represents an index cursor. The expected use * cursor = new cursor(); - * while (more predicates){ - * -cursor.open(predicate); - * -while (cursor.hasNext()){ - * --cursor.next() + * try{ + * -while (more predicates){ + * --cursor.open(predicate); + * --try{ + * ---while (cursor.hasNext()){ + * ----cursor.next() + * ---} + * --} finally{ + * ---cursor.close(); + * --} * -} - * -cursor.close(); + * } finally{ + * -cursor.destroy(); * } - * cursor.destroy(); * Each created cursor must have destroy called * Each successfully opened cursor must have close called * @@ -47,7 +53,8 @@ * When a cursor object is created, it is in the CLOSED state. * CLOSED: Legal calls are open() --> OPENED, or destroy() --> DESTROYED, close() --> no effect * OPENED: The only legal calls are hasNext(), next(), or close() --> CLOSED. - * DESTROYED: All calls are illegal. + * DESTROYED: The only legal call is destroy() which has no effect. + * * Cursors must enforce the cursor state machine */ public interface IIndexCursor extends IDestroyable { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 1443bbc..73969db 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -28,11 +28,14 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,8 +48,10 @@ import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.replication.IIOReplicationManager; import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.storage.common.buffercache.CachedPage.State; import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.storage.common.file.IFileMapManager; +import org.apache.hyracks.util.InvokeUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -55,6 +60,7 @@ private static final Logger LOGGER = LogManager.getLogger(); private static final int MAP_FACTOR = 3; + private static final CachedPage POISON_PILL = new CachedPage(); private static final int MIN_CLEANED_COUNT_DIFF = 3; private static final int PIN_MAX_WAIT_TIME = 50; @@ -66,7 +72,8 @@ private final int pageSize; private final int maxOpenFiles; - final IIOManager ioManager; + private final ExecutorService executor; + private final IIOManager ioManager; private final CacheBucket[] pageMap; private final IPageReplacementStrategy pageReplacementStrategy; private final IPageCleanerPolicy pageCleanerPolicy; @@ -75,6 +82,7 @@ private final Map<Integer, BufferedFileHandle> fileInfoMap; private final AsyncFIFOPageQueueManager fifoWriter; private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>(); + private final BlockingQueue<CachedPage> readRequests; //DEBUG private Level fileOpsLevel = Level.DEBUG; @@ -103,19 +111,43 @@ this.pageReplacementStrategy = pageReplacementStrategy; this.pageCleanerPolicy = pageCleanerPolicy; this.fileMapManager = fileMapManager; - - Executor executor = Executors.newCachedThreadPool(threadFactory); - fileInfoMap = new HashMap<>(); - cleanerThread = new CleanerThread(); - executor.execute(cleanerThread); - closed = false; - - fifoWriter = new AsyncFIFOPageQueueManager(this); - if (DEBUG) { - confiscatedPages = new ArrayList<>(); - confiscatedPagesOwner = new HashMap<>(); - confiscateLock = new ReentrantLock(); - pinnedPageOwner = new ConcurrentHashMap<>(); + int numReaders = ioManager.getIODevices().size() * 2; + readRequests = new ArrayBlockingQueue<>(pageReplacementStrategy.getMaxAllowedNumPages()); + executor = Executors.newFixedThreadPool(numReaders + 1, threadFactory); + try { + fileInfoMap = new HashMap<>(); + cleanerThread = new CleanerThread(); + executor.execute(cleanerThread); + for (int i = 0; i < numReaders; i++) { + executor.execute(new ReaderThread(i)); + } + closed = false; + fifoWriter = new AsyncFIFOPageQueueManager(this); + if (DEBUG) { + confiscatedPages = new ArrayList<>(); + confiscatedPagesOwner = new HashMap<>(); + confiscateLock = new ReentrantLock(); + pinnedPageOwner = new ConcurrentHashMap<>(); + } + } catch (Throwable th) { + try { + throw th; + } finally { + readRequests.offer(POISON_PILL); // NOSONAR will always succeed since the queue is empty + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service"); + } + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service"); + Thread.currentThread().interrupt(); + th.addSuppressed(e); + } catch (Throwable e) { + LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service", e); + th.addSuppressed(e); + } + } } } @@ -185,22 +217,27 @@ // Resolve race of multiple threads trying to read the page from // disk. synchronized (cPage) { - if (!cPage.valid) { + if (cPage.state != State.VALID) { try { - tryRead(cPage); - cPage.valid = true; - } catch (Exception e) { - LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e); - throw e; - } finally { - if (!cPage.valid) { - unpin(cPage); + // Will attempt to re-read even if previous read failed + if (cPage.state == State.INVALID || cPage.state == State.READ_FAILED) { + // submit request to read + cPage.state = State.READ_REQUESTED; + readRequests.put(cPage); } + cPage.awaitRead(); + } catch (InterruptedException e) { + cPage.state = State.INVALID; + unpin(cPage); + throw HyracksDataException.create(e); + } catch (Throwable th) { + unpin(cPage); + throw HyracksDataException.create(th); } } } } else { - cPage.valid = true; + cPage.state = State.VALID; } pageReplacementStrategy.notifyCachePageAccess(cPage); if (DEBUG) { @@ -449,7 +486,7 @@ buffer.append(" ").append(cp.cpid).append(" -> [") .append(BufferedFileHandle.getFileId(cp.dpid)).append(':') .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get()) - .append(", ").append(cp.valid ? "valid" : "invalid").append(", ") + .append(", ").append(cp.state).append(", ") .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ") .append(cp.dirty.get() ? "dirty" : "clean").append("]\n"); cp = cp.next; @@ -480,7 +517,7 @@ if (c.confiscated() || c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) { return false; } - if (c.valid) { + if (c.state == State.VALID) { reachableDpids.add(c.dpid); } } @@ -519,6 +556,9 @@ read(cPage); return; } catch (HyracksDataException readException) { + if (Thread.interrupted()) { + LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted."); + } if (readException.getErrorCode() == ErrorCode.CANNOT_READ_CLOSED_FILE && i <= MAX_PAGE_READ_ATTEMPTS) { /** * if the read failure was due to another thread closing the file channel because @@ -530,8 +570,7 @@ LOGGER.log(Level.WARN, String.format("Failed to read page. Retrying attempt (%d/%d)", i + 1, MAX_PAGE_READ_ATTEMPTS), readException); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); + LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted."); } } else { throw readException; @@ -670,6 +709,54 @@ } } + private class ReaderThread implements Runnable { + private final int num; + + private ReaderThread(int num) { + this.num = num; + } + + @Override + public void run() { + Thread.currentThread().setName("Buffer-Cache-Reader-" + num); + while (true) { + CachedPage next; + try { + next = readRequests.take(); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Ignoring interrupt. Reader threads should never be interrupted."); + break; + } + if (next == POISON_PILL) { + LOGGER.log(Level.INFO, "Exiting"); + InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL)); + if (Thread.interrupted()) { + LOGGER.log(Level.ERROR, "Ignoring interrupt. Reader threads should never be interrupted."); + } + break; + } + if (next.state != State.READ_REQUESTED) { + LOGGER.log(Level.ERROR, + "Exiting BufferCache reader thread. Took a page with state = {} out of the queue", + next.state); + break; + } + try { + tryRead(next); + next.state = State.VALID; + } catch (HyracksDataException e) { + next.readFailure = e; + next.state = State.READ_FAILED; + LOGGER.log(Level.WARN, "Failed to read a page", e); + } + synchronized (next) { + next.notifyAll(); + } + } + } + + } + private class CleanerThread implements Runnable { private volatile boolean shutdownStart = false; private volatile boolean shutdownComplete = false; @@ -798,6 +885,16 @@ } }); fileInfoMap.clear(); + } + InvokeUtil.doUninterruptibly(() -> readRequests.put(POISON_PILL)); + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.log(Level.WARN, "Failure shutting down buffer cache executor service"); + } + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Interrupted while shutting down buffer cache executor service"); + Thread.currentThread().interrupt(); } } @@ -1343,7 +1440,7 @@ } try { cPage.reset(cPage.dpid); - cPage.valid = true; + cPage.state = State.VALID; cPage.next = bucket.cachedPage; bucket.cachedPage = cPage; cPage.pinCount.decrementAndGet(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java index bc0a04e..d7a55af 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java @@ -23,10 +23,19 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hyracks.api.exceptions.HyracksDataException; + /** * @author yingyib */ public class CachedPage implements ICachedPageInternal { + public enum State { + INVALID, + READ_REQUESTED, + READ_FAILED, + VALID + } + final int cpid; ByteBuffer buffer; public final AtomicInteger pinCount; @@ -36,7 +45,7 @@ private final IPageReplacementStrategy pageReplacementStrategy; volatile long dpid; // disk page id (composed of file id and page id) CachedPage next; - volatile boolean valid; + volatile State state; final AtomicBoolean confiscated; private IQueueInfo queueInfo; private int multiplier; @@ -44,6 +53,7 @@ // DEBUG private static final boolean DEBUG = false; private final StackTraceElement[] ctorStack; + Throwable readFailure; //Constructor for making dummy entry for FIFO queue public CachedPage() { @@ -72,7 +82,7 @@ latch = new ReentrantReadWriteLock(true); replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid); dpid = -1; - valid = false; + state = State.INVALID; confiscated = new AtomicBoolean(false); queueInfo = null; ctorStack = DEBUG ? new Throwable().getStackTrace() : null; @@ -81,7 +91,7 @@ public void reset(long dpid) { this.dpid = dpid; dirty.set(false); - valid = false; + state = State.INVALID; confiscated.set(false); pageReplacementStrategy.notifyCachePageReset(this); queueInfo = null; @@ -205,4 +215,30 @@ public boolean isLargePage() { return multiplier > 1; } + + /** + * Wait for the page requested to be read to complete the read operation + * This method is uninterrubtible + * + * @throws HyracksDataException + */ + public synchronized void awaitRead() throws HyracksDataException { + boolean interrupted = false; + try { + while (state != State.VALID && state != State.READ_FAILED) { + try { + wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + if (state == State.READ_FAILED) { + throw HyracksDataException.create(readFailure); + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2387 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Id28d57a222f42962284b24296cb9b91658e5dc77 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
