abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2274

Change subject: [NO ISSUE][STO] Unpin pages when interrupted during reads
......................................................................

[NO ISSUE][STO] Unpin pages when interrupted during reads

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- This change fixes a bug that happens when a thread pinning
  a page that is not already in the buffer cache is interrupted.
- The fix is that if a failure happens during the read call,
  the page is unpinned.
- A test case is added

Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49
---
M 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
M 
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
A 
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
3 files changed, 139 insertions(+), 1 deletion(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/74/2274/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 6212896..feb3739 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -186,7 +186,13 @@
             // disk.
             synchronized (cPage) {
                 if (!cPage.valid) {
-                    tryRead(cPage);
+                    try {
+                        tryRead(cPage);
+                    }catch (Exception e){
+                        LOGGER.log(Level.WARN,"Failure while trying to read a 
page from disk", e);
+                        unpin(cPage);
+                        throw e;
+                    }
                     cPage.valid = true;
                 }
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index 26ad457..8ed8eb9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -26,8 +26,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -39,11 +42,15 @@
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
 import org.apache.hyracks.test.support.TestUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class BufferCacheTest {
+    private static final Logger LOGGER = LogManager.getLogger();
     protected static final List<String> openedFiles = new ArrayList<>();
     protected static final SimpleDateFormat simpleDateFormat = new 
SimpleDateFormat("ddMMyy-hhmmssSS");
 
@@ -62,6 +69,101 @@
     }
 
     @Test
+    public void interruptPinTest() throws Exception {
+        /*
+         * This test will create a buffer cache of a small size (4 pages)
+         * and then will create a file of size = 16 pages and have 4 threads
+         * pin and unpin the pages one by one. and another thread interrupts 
them
+         * for some time.. It then will close the file and ensure that all the 
pages are
+         * unpinned and that no problems are found
+         */
+        final int bufferCacheNumPages = 4;
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, bufferCacheNumPages, 
MAX_OPEN_FILES);
+        IIOManager ioManager = 
TestStorageManagerComponentHolder.getIOManager();
+        IBufferCache bufferCache =
+                
TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        final long duration = TimeUnit.SECONDS.toMillis(20);
+        final String fileName = getFileName();
+        final FileReference file = ioManager.resolve(fileName);
+        final int fileId = bufferCache.createFile(file);
+        final int numPages = 16;
+        bufferCache.openFile(fileId);
+        for(int i=0;i<numPages;i++){
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
+            ICachedPage page = bufferCache.confiscatePage(dpid);
+            page.getBuffer().putInt(0, i);
+            bufferCache.createFIFOQueue().put(page);
+        }
+        bufferCache.finishQueue();
+        bufferCache.closeFile(fileId);
+        ExecutorService executor = 
Executors.newFixedThreadPool(bufferCacheNumPages);
+        MutableObject<Thread>[] readers = new 
MutableObject[bufferCacheNumPages];
+        Future<Void>[] futures = new Future[bufferCacheNumPages];
+        for(int i=0;i<bufferCacheNumPages;i++){
+            readers[i] = new MutableObject<>();
+            final int threadNumber = i;
+            futures[i] = executor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    synchronized (readers[threadNumber]){
+                        readers[threadNumber].setValue(Thread.currentThread());
+                        readers[threadNumber].notifyAll();
+                    }
+                    // for 10 seconds, just read the pages one by one.
+                    // At the end, close the file
+                    bufferCache.openFile(fileId);
+                    final long start = System.currentTimeMillis();
+                    int pageNumber = 0;
+                    int totalReads = 0;
+                    int successfulReads = 0;
+                    int interruptedReads = 0;
+                    while(System.currentTimeMillis() - start < duration){
+                        totalReads++;
+                        pageNumber = (pageNumber+1)%numPages;
+                        try{
+                            long dpid = 
BufferedFileHandle.getDiskPageId(fileId, pageNumber);
+                            ICachedPage page = bufferCache.pin(dpid, false);
+                            successfulReads++;
+                            bufferCache.unpin(page);
+                        } catch(HyracksDataException hde){
+                            interruptedReads++;
+                            // clear
+                            Thread.interrupted();
+                        }
+                    }
+                    bufferCache.closeFile(fileId);
+                    LOGGER.log(Level.INFO,"Total reads = " + totalReads +" 
Successful Reads = " + successfulReads +" Interrupted Reads = " + 
interruptedReads);
+                    return null;
+                }
+            });
+        }
+
+        for(int i=0;i<bufferCacheNumPages;i++){
+            synchronized (readers[i]){
+                while(readers[i].getValue() == null){
+                    readers[i].wait();
+                }
+            }
+        }
+        final long start = System.currentTimeMillis();
+
+        while(System.currentTimeMillis() - start < duration){
+            for(int i=0;i<bufferCacheNumPages;i++){
+                readers[i].getValue().interrupt();
+            }
+            Thread.sleep(25);
+        }
+        try {
+            for (int i = 0; i < bufferCacheNumPages; i++) {
+                futures[i].get();
+            }
+        } finally {
+            bufferCache.deleteFile(fileId);
+            bufferCache.close();
+        }
+    }
+
+    @Test
     public void simpleOpenPinCloseTest() throws HyracksException {
         TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, 
MAX_OPEN_FILES);
         IBufferCache bufferCache =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..28c1a61
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in complianceo
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<Configuration status="INFO">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_ERR">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="INFO">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2274
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to