Luo Chen has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2560
Change subject: [NO ISSUE] Use Async Write Mode in ResultState ...................................................................... [NO ISSUE] Use Async Write Mode in ResultState - user model changes: no - storage format changes: no - interface changes: no Details: - Use Async write mode in ResultState to improve it's throughput. - For concurrent reader, use the same file handle (since it's a RandomAccessFile) for both read/write. Reference counting is used to ensure the file is properly opened/closed. Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2 --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java 1 file changed, 44 insertions(+), 29 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/60/2560/1 diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java index b832b20..eaef3cc 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hyracks.api.dataflow.state.IStateObject; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -61,9 +62,9 @@ private FileReference fileRef; - private IFileHandle writeFileHandle; + private IFileHandle fileHandle; - private IFileHandle readFileHandle; + private final AtomicInteger referenceCount = new AtomicInteger(0); private long size; @@ -86,12 +87,13 @@ localPageList = new ArrayList<>(); fileRef = null; - writeFileHandle = null; + fileHandle = null; } public synchronized void open() { size = 0; persistentSize = 0; + referenceCount.set(0); } public synchronized void close() { @@ -112,25 +114,29 @@ } private void closeWriteFileHandle() { - if (writeFileHandle != null) { + if (fileHandle != null) { + doCloseFileHandle(); + } + } + + private void doCloseFileHandle() { + if (referenceCount.decrementAndGet() == 0) { + // close the file if there is no more reference try { - ioManager.close(writeFileHandle); + ioManager.close(fileHandle); } catch (IOException e) { // Since file handle could not be closed, just ignore. } - writeFileHandle = null; + fileHandle = null; } } public synchronized void write(ByteBuffer buffer) throws HyracksDataException { if (fileRef == null) { - String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); - fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); - writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_SYNC); + initWriteFileHandle(); } - size += ioManager.syncWrite(writeFileHandle, size, buffer); + size += ioManager.syncWrite(fileHandle, size, buffer); notifyAll(); } @@ -165,9 +171,8 @@ } public synchronized void readClose() throws HyracksDataException { - if (readFileHandle != null) { - ioManager.close(readFileHandle); - readFileHandle = null; + if (fileHandle != null) { + doCloseFileHandle(); } } @@ -185,10 +190,10 @@ return readSize; } - if (readFileHandle == null) { + if (fileHandle == null) { initReadFileHandle(); } - readSize = ioManager.syncRead(readFileHandle, offset, buffer); + readSize = ioManager.syncRead(fileHandle, offset, buffer); return readSize; } @@ -210,10 +215,10 @@ } if (offset < persistentSize) { - if (readFileHandle == null) { + if (fileHandle == null) { initReadFileHandle(); } - readSize = ioManager.syncRead(readFileHandle, offset, buffer); + readSize = ioManager.syncRead(fileHandle, offset, buffer); if (readSize < 0) { throw new HyracksDataException("Premature end of file"); } @@ -245,21 +250,17 @@ // If we do not have any pages to be given back close the write channel since we don't write any more, return null. if (page == null) { - ioManager.close(writeFileHandle); + ioManager.close(fileHandle); return null; } page.getBuffer().flip(); if (fileRef == null) { - String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); - fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); - writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - notifyAll(); + initWriteFileHandle(); } - long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer()); + long delta = ioManager.syncWrite(fileHandle, persistentSize, page.getBuffer()); persistentSize += delta; return page; } @@ -325,8 +326,23 @@ return page; } + private void initWriteFileHandle() throws HyracksDataException { + if (fileHandle == null) { + String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); + fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); + fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + if (referenceCount.get() != 0) { + throw new IllegalStateException("Illegal reference count " + referenceCount.get()); + } + referenceCount.set(1); + notifyAll(); + } + } + private void initReadFileHandle() throws HyracksDataException { - while (fileRef == null && !failed.get()) { + while (fileHandle == null && !failed.get()) { + // wait for writer to create fileHandle try { wait(); } catch (InterruptedException e) { @@ -335,10 +351,9 @@ } if (failed.get()) { return; + } else { + referenceCount.incrementAndGet(); } - - readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/2560 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <cl...@uci.edu>