abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][STO] Unpin pages when interrupted during reads ......................................................................
[NO ISSUE][STO] Unpin pages when interrupted during reads - user model changes: no - storage format changes: no - interface changes: no Details: - This change fixes a bug that happens when a thread pinning a page that is not already in the buffer cache is interrupted. - The fix is that if a failure happens during the read call, the page is unpinned. - A test case is added Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2274 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java 3 files changed, 126 insertions(+), 2 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; Verified Michael Blow: Looks good to me, approved diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 6212896..302c7b2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -186,8 +186,17 @@ // disk. synchronized (cPage) { if (!cPage.valid) { - tryRead(cPage); - cPage.valid = true; + try { + tryRead(cPage); + cPage.valid = true; + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure while trying to read a page from disk", e); + throw e; + } finally { + if (!cPage.valid) { + unpin(cPage); + } + } } } } else { diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml index f93df0a..bb31885 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml @@ -68,5 +68,13 @@ <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java index 26ad457..f94914c 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java @@ -26,8 +26,15 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; @@ -39,11 +46,15 @@ import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.test.support.TestStorageManagerComponentHolder; import org.apache.hyracks.test.support.TestUtils; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; public class BufferCacheTest { + private static final Logger LOGGER = LogManager.getLogger(); protected static final List<String> openedFiles = new ArrayList<>(); protected static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS"); @@ -62,6 +73,102 @@ } @Test + public void interruptPinTest() throws Exception { + /* + * This test will create a buffer cache of a small size (4 pages) + * and then will create a file of size = 16 pages and have 4 threads + * pin and unpin the pages one by one. and another thread interrupts them + * for some time.. It then will close the file and ensure that all the pages are + * unpinned and that no problems are found + */ + final int bufferCacheNumPages = 4; + TestStorageManagerComponentHolder.init(PAGE_SIZE, bufferCacheNumPages, MAX_OPEN_FILES); + IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager(); + IBufferCache bufferCache = + TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext()); + final long duration = TimeUnit.SECONDS.toMillis(20); + final String fileName = getFileName(); + final FileReference file = ioManager.resolve(fileName); + final int fileId = bufferCache.createFile(file); + final int numPages = 16; + bufferCache.openFile(fileId); + for (int i = 0; i < numPages; i++) { + long dpid = BufferedFileHandle.getDiskPageId(fileId, i); + ICachedPage page = bufferCache.confiscatePage(dpid); + page.getBuffer().putInt(0, i); + bufferCache.createFIFOQueue().put(page); + } + bufferCache.finishQueue(); + bufferCache.closeFile(fileId); + ExecutorService executor = Executors.newFixedThreadPool(bufferCacheNumPages); + MutableObject<Thread>[] readers = new MutableObject[bufferCacheNumPages]; + Future<Void>[] futures = new Future[bufferCacheNumPages]; + for (int i = 0; i < bufferCacheNumPages; i++) { + readers[i] = new MutableObject<>(); + final int threadNumber = i; + futures[i] = executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + synchronized (readers[threadNumber]) { + readers[threadNumber].setValue(Thread.currentThread()); + readers[threadNumber].notifyAll(); + } + // for duration, just read the pages one by one. + // At the end, close the file + bufferCache.openFile(fileId); + final long start = System.currentTimeMillis(); + int pageNumber = 0; + int totalReads = 0; + int successfulReads = 0; + int interruptedReads = 0; + while (System.currentTimeMillis() - start < duration) { + totalReads++; + pageNumber = (pageNumber + 1) % numPages; + try { + long dpid = BufferedFileHandle.getDiskPageId(fileId, pageNumber); + ICachedPage page = bufferCache.pin(dpid, false); + successfulReads++; + bufferCache.unpin(page); + } catch (HyracksDataException hde) { + interruptedReads++; + // clear + Thread.interrupted(); + } + } + bufferCache.closeFile(fileId); + LOGGER.log(Level.INFO, "Total reads = " + totalReads + " Successful Reads = " + successfulReads + + " Interrupted Reads = " + interruptedReads); + return null; + } + }); + } + + for (int i = 0; i < bufferCacheNumPages; i++) { + synchronized (readers[i]) { + while (readers[i].getValue() == null) { + readers[i].wait(); + } + } + } + final long start = System.currentTimeMillis(); + + while (System.currentTimeMillis() - start < duration) { + for (int i = 0; i < bufferCacheNumPages; i++) { + readers[i].getValue().interrupt(); + } + Thread.sleep(25); // NOSONAR Sleep so some reads are successful + } + try { + for (int i = 0; i < bufferCacheNumPages; i++) { + futures[i].get(); + } + } finally { + bufferCache.deleteFile(fileId); + bufferCache.close(); + } + } + + @Test public void simpleOpenPinCloseTest() throws HyracksException { TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES); IBufferCache bufferCache = -- To view, visit https://asterix-gerrit.ics.uci.edu/2274 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
