Luo Chen has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2560

Change subject: [NO ISSUE] Use Async Write Mode in ResultState
......................................................................

[NO ISSUE] Use Async Write Mode in ResultState

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Use Async write mode in ResultState to improve it's throughput.
- For concurrent reader, use the same file handle (since it's a
RandomAccessFile) for both read/write. Reference counting is used to
ensure the file is properly opened/closed.

Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
1 file changed, 44 insertions(+), 29 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/60/2560/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index b832b20..eaef3cc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.dataflow.state.IStateObject;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -61,9 +62,9 @@
 
     private FileReference fileRef;
 
-    private IFileHandle writeFileHandle;
+    private IFileHandle fileHandle;
 
-    private IFileHandle readFileHandle;
+    private final AtomicInteger referenceCount = new AtomicInteger(0);
 
     private long size;
 
@@ -86,12 +87,13 @@
         localPageList = new ArrayList<>();
 
         fileRef = null;
-        writeFileHandle = null;
+        fileHandle = null;
     }
 
     public synchronized void open() {
         size = 0;
         persistentSize = 0;
+        referenceCount.set(0);
     }
 
     public synchronized void close() {
@@ -112,25 +114,29 @@
     }
 
     private void closeWriteFileHandle() {
-        if (writeFileHandle != null) {
+        if (fileHandle != null) {
+            doCloseFileHandle();
+        }
+    }
+
+    private void doCloseFileHandle() {
+        if (referenceCount.decrementAndGet() == 0) {
+            // close the file if there is no more reference
             try {
-                ioManager.close(writeFileHandle);
+                ioManager.close(fileHandle);
             } catch (IOException e) {
                 // Since file handle could not be closed, just ignore.
             }
-            writeFileHandle = null;
+            fileHandle = null;
         }
     }
 
     public synchronized void write(ByteBuffer buffer) throws 
HyracksDataException {
         if (fileRef == null) {
-            String fName = FILE_PREFIX + 
String.valueOf(resultSetPartitionId.getPartition());
-            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
-            writeFileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_WRITE,
-                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_SYNC);
+            initWriteFileHandle();
         }
 
-        size += ioManager.syncWrite(writeFileHandle, size, buffer);
+        size += ioManager.syncWrite(fileHandle, size, buffer);
         notifyAll();
     }
 
@@ -165,9 +171,8 @@
     }
 
     public synchronized void readClose() throws HyracksDataException {
-        if (readFileHandle != null) {
-            ioManager.close(readFileHandle);
-            readFileHandle = null;
+        if (fileHandle != null) {
+            doCloseFileHandle();
         }
     }
 
@@ -185,10 +190,10 @@
             return readSize;
         }
 
-        if (readFileHandle == null) {
+        if (fileHandle == null) {
             initReadFileHandle();
         }
-        readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+        readSize = ioManager.syncRead(fileHandle, offset, buffer);
 
         return readSize;
     }
@@ -210,10 +215,10 @@
             }
 
             if (offset < persistentSize) {
-                if (readFileHandle == null) {
+                if (fileHandle == null) {
                     initReadFileHandle();
                 }
-                readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+                readSize = ioManager.syncRead(fileHandle, offset, buffer);
                 if (readSize < 0) {
                     throw new HyracksDataException("Premature end of file");
                 }
@@ -245,21 +250,17 @@
 
         // If we do not have any pages to be given back close the write 
channel since we don't write any more, return null.
         if (page == null) {
-            ioManager.close(writeFileHandle);
+            ioManager.close(fileHandle);
             return null;
         }
 
         page.getBuffer().flip();
 
         if (fileRef == null) {
-            String fName = FILE_PREFIX + 
String.valueOf(resultSetPartitionId.getPartition());
-            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
-            writeFileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_WRITE,
-                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-            notifyAll();
+            initWriteFileHandle();
         }
 
-        long delta = ioManager.syncWrite(writeFileHandle, persistentSize, 
page.getBuffer());
+        long delta = ioManager.syncWrite(fileHandle, persistentSize, 
page.getBuffer());
         persistentSize += delta;
         return page;
     }
@@ -325,8 +326,23 @@
         return page;
     }
 
+    private void initWriteFileHandle() throws HyracksDataException {
+        if (fileHandle == null) {
+            String fName = FILE_PREFIX + 
String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+            fileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            if (referenceCount.get() != 0) {
+                throw new IllegalStateException("Illegal reference count " + 
referenceCount.get());
+            }
+            referenceCount.set(1);
+            notifyAll();
+        }
+    }
+
     private void initReadFileHandle() throws HyracksDataException {
-        while (fileRef == null && !failed.get()) {
+        while (fileHandle == null && !failed.get()) {
+            // wait for writer to create fileHandle
             try {
                 wait();
             } catch (InterruptedException e) {
@@ -335,10 +351,9 @@
         }
         if (failed.get()) {
             return;
+        } else {
+            referenceCount.incrementAndGet();
         }
-
-        readFileHandle = ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_ONLY,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2560
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <cl...@uci.edu>

Reply via email to