Luo Chen has submitted this change and it was merged. Change subject: [NO ISSUE] Use Async Write Mode in ResultState ......................................................................
[NO ISSUE] Use Async Write Mode in ResultState - user model changes: no - storage format changes: no - interface changes: no Details: - Use Async write mode in ResultState to improve it's throughput. - For concurrent reader, use the same file handle (since it's a RandomAccessFile) for both read/write. Reference counting is used to ensure the file is properly opened/closed. Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2560 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java 1 file changed, 73 insertions(+), 57 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; ; Verified Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java index b832b20..6b35912 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java @@ -61,9 +61,9 @@ private FileReference fileRef; - private IFileHandle writeFileHandle; + private IFileHandle fileHandle; - private IFileHandle readFileHandle; + private volatile int referenceCount = 0; private long size; @@ -86,12 +86,13 @@ localPageList = new ArrayList<>(); fileRef = null; - writeFileHandle = null; + fileHandle = null; } public synchronized void open() { size = 0; persistentSize = 0; + referenceCount = 0; } public synchronized void close() { @@ -112,25 +113,29 @@ } private void closeWriteFileHandle() { - if (writeFileHandle != null) { + if (fileHandle != null) { + doCloseFileHandle(); + } + } + + private void doCloseFileHandle() { + if (--referenceCount == 0) { + // close the file if there is no more reference try { - ioManager.close(writeFileHandle); + ioManager.close(fileHandle); } catch (IOException e) { // Since file handle could not be closed, just ignore. } - writeFileHandle = null; + fileHandle = null; } } public synchronized void write(ByteBuffer buffer) throws HyracksDataException { if (fileRef == null) { - String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); - fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); - writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_SYNC); + initWriteFileHandle(); } - size += ioManager.syncWrite(writeFileHandle, size, buffer); + size += ioManager.syncWrite(fileHandle, size, buffer); notifyAll(); } @@ -165,9 +170,8 @@ } public synchronized void readClose() throws HyracksDataException { - if (readFileHandle != null) { - ioManager.close(readFileHandle); - readFileHandle = null; + if (fileHandle != null) { + doCloseFileHandle(); } } @@ -185,51 +189,49 @@ return readSize; } - if (readFileHandle == null) { + if (fileHandle == null) { initReadFileHandle(); } - readSize = ioManager.syncRead(readFileHandle, offset, buffer); + readSize = ioManager.syncRead(fileHandle, offset, buffer); return readSize; } - public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer) + public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer) throws HyracksDataException { long readSize = 0; - synchronized (this) { - while (offset >= size && !eos.get() && !failed.get()) { - try { - wait(); - } catch (InterruptedException e) { - throw HyracksDataException.create(e); - } + while (offset >= size && !eos.get() && !failed.get()) { + try { + wait(); + } catch (InterruptedException e) { + throw HyracksDataException.create(e); } + } - if ((offset >= size && eos.get()) || failed.get()) { + if ((offset >= size && eos.get()) || failed.get()) { + return readSize; + } + + if (offset < persistentSize) { + if (fileHandle == null) { + initReadFileHandle(); + } + readSize = ioManager.syncRead(fileHandle, offset, buffer); + if (readSize < 0) { + throw new HyracksDataException("Premature end of file"); + } + } + + if (readSize < buffer.capacity()) { + long localPageOffset = offset - persistentSize; + int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize()); + int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize()); + Page page = getPage(localPageIndex); + if (page == null) { return readSize; } - - if (offset < persistentSize) { - if (readFileHandle == null) { - initReadFileHandle(); - } - readSize = ioManager.syncRead(readFileHandle, offset, buffer); - if (readSize < 0) { - throw new HyracksDataException("Premature end of file"); - } - } - - if (readSize < buffer.capacity()) { - long localPageOffset = offset - persistentSize; - int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize()); - int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize()); - Page page = getPage(localPageIndex); - if (page == null) { - return readSize; - } - readSize += buffer.remaining(); - buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining()); - } + readSize += buffer.remaining(); + buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining()); } datasetMemoryManager.pageReferenced(resultSetPartitionId); return readSize; @@ -245,21 +247,17 @@ // If we do not have any pages to be given back close the write channel since we don't write any more, return null. if (page == null) { - ioManager.close(writeFileHandle); + ioManager.close(fileHandle); return null; } page.getBuffer().flip(); if (fileRef == null) { - String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); - fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); - writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - notifyAll(); + initWriteFileHandle(); } - long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer()); + long delta = ioManager.syncWrite(fileHandle, persistentSize, page.getBuffer()); persistentSize += delta; return page; } @@ -325,8 +323,23 @@ return page; } + private void initWriteFileHandle() throws HyracksDataException { + if (fileHandle == null) { + String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition()); + fileRef = fileFactory.createUnmanagedWorkspaceFile(fName); + fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + if (referenceCount != 0) { + throw new IllegalStateException("Illegal reference count " + referenceCount); + } + referenceCount = 1; + notifyAll(); // NOSONAR: always called from a synchronized block + } + } + private void initReadFileHandle() throws HyracksDataException { while (fileRef == null && !failed.get()) { + // wait for writer to create the file try { wait(); } catch (InterruptedException e) { @@ -336,9 +349,12 @@ if (failed.get()) { return; } - - readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + if (fileHandle == null) { + // fileHandle has been closed by the writer, create it again + fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + } + referenceCount++; } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/2560 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
