abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2274
Change subject: [NO ISSUE][STO] Unpin pages when interrupted during reads
......................................................................
[NO ISSUE][STO] Unpin pages when interrupted during reads
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- This change fixes a bug that happens when a thread pinning
a page that is not already in the buffer cache is interrupted.
- The fix is that if a failure happens during the read call,
the page is unpinned.
- A test case is added
Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49
---
M
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
M
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
A
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
3 files changed, 139 insertions(+), 1 deletion(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/74/2274/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 6212896..feb3739 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -186,7 +186,13 @@
// disk.
synchronized (cPage) {
if (!cPage.valid) {
- tryRead(cPage);
+ try {
+ tryRead(cPage);
+ }catch (Exception e){
+ LOGGER.log(Level.WARN,"Failure while trying to read a
page from disk", e);
+ unpin(cPage);
+ throw e;
+ }
cPage.valid = true;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index 26ad457..8ed8eb9 100644
---
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -26,8 +26,11 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -39,11 +42,15 @@
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
import org.apache.hyracks.test.support.TestUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
public class BufferCacheTest {
+ private static final Logger LOGGER = LogManager.getLogger();
protected static final List<String> openedFiles = new ArrayList<>();
protected static final SimpleDateFormat simpleDateFormat = new
SimpleDateFormat("ddMMyy-hhmmssSS");
@@ -62,6 +69,101 @@
}
@Test
+ public void interruptPinTest() throws Exception {
+ /*
+ * This test will create a buffer cache of a small size (4 pages)
+ * and then will create a file of size = 16 pages and have 4 threads
+ * pin and unpin the pages one by one. and another thread interrupts
them
+ * for some time.. It then will close the file and ensure that all the
pages are
+ * unpinned and that no problems are found
+ */
+ final int bufferCacheNumPages = 4;
+ TestStorageManagerComponentHolder.init(PAGE_SIZE, bufferCacheNumPages,
MAX_OPEN_FILES);
+ IIOManager ioManager =
TestStorageManagerComponentHolder.getIOManager();
+ IBufferCache bufferCache =
+
TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+ final long duration = TimeUnit.SECONDS.toMillis(20);
+ final String fileName = getFileName();
+ final FileReference file = ioManager.resolve(fileName);
+ final int fileId = bufferCache.createFile(file);
+ final int numPages = 16;
+ bufferCache.openFile(fileId);
+ for(int i=0;i<numPages;i++){
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
+ ICachedPage page = bufferCache.confiscatePage(dpid);
+ page.getBuffer().putInt(0, i);
+ bufferCache.createFIFOQueue().put(page);
+ }
+ bufferCache.finishQueue();
+ bufferCache.closeFile(fileId);
+ ExecutorService executor =
Executors.newFixedThreadPool(bufferCacheNumPages);
+ MutableObject<Thread>[] readers = new
MutableObject[bufferCacheNumPages];
+ Future<Void>[] futures = new Future[bufferCacheNumPages];
+ for(int i=0;i<bufferCacheNumPages;i++){
+ readers[i] = new MutableObject<>();
+ final int threadNumber = i;
+ futures[i] = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ synchronized (readers[threadNumber]){
+ readers[threadNumber].setValue(Thread.currentThread());
+ readers[threadNumber].notifyAll();
+ }
+ // for 10 seconds, just read the pages one by one.
+ // At the end, close the file
+ bufferCache.openFile(fileId);
+ final long start = System.currentTimeMillis();
+ int pageNumber = 0;
+ int totalReads = 0;
+ int successfulReads = 0;
+ int interruptedReads = 0;
+ while(System.currentTimeMillis() - start < duration){
+ totalReads++;
+ pageNumber = (pageNumber+1)%numPages;
+ try{
+ long dpid =
BufferedFileHandle.getDiskPageId(fileId, pageNumber);
+ ICachedPage page = bufferCache.pin(dpid, false);
+ successfulReads++;
+ bufferCache.unpin(page);
+ } catch(HyracksDataException hde){
+ interruptedReads++;
+ // clear
+ Thread.interrupted();
+ }
+ }
+ bufferCache.closeFile(fileId);
+ LOGGER.log(Level.INFO,"Total reads = " + totalReads +"
Successful Reads = " + successfulReads +" Interrupted Reads = " +
interruptedReads);
+ return null;
+ }
+ });
+ }
+
+ for(int i=0;i<bufferCacheNumPages;i++){
+ synchronized (readers[i]){
+ while(readers[i].getValue() == null){
+ readers[i].wait();
+ }
+ }
+ }
+ final long start = System.currentTimeMillis();
+
+ while(System.currentTimeMillis() - start < duration){
+ for(int i=0;i<bufferCacheNumPages;i++){
+ readers[i].getValue().interrupt();
+ }
+ Thread.sleep(25);
+ }
+ try {
+ for (int i = 0; i < bufferCacheNumPages; i++) {
+ futures[i].get();
+ }
+ } finally {
+ bufferCache.deleteFile(fileId);
+ bufferCache.close();
+ }
+ }
+
+ @Test
public void simpleOpenPinCloseTest() throws HyracksException {
TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES,
MAX_OPEN_FILES);
IBufferCache bufferCache =
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..28c1a61
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/resources/log4j2.xml
@@ -0,0 +1,30 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in complianceo
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<Configuration status="INFO">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_ERR">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} -
%msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="INFO">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/2274
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>