Luo Chen has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2560
Change subject: [NO ISSUE] Use Async Write Mode in ResultState
......................................................................
[NO ISSUE] Use Async Write Mode in ResultState
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Use Async write mode in ResultState to improve it's throughput.
- For concurrent reader, use the same file handle (since it's a
RandomAccessFile) for both read/write. Reference counting is used to
ensure the file is properly opened/closed.
Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2
---
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
1 file changed, 44 insertions(+), 29 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/60/2560/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index b832b20..eaef3cc 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hyracks.api.dataflow.state.IStateObject;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -61,9 +62,9 @@
private FileReference fileRef;
- private IFileHandle writeFileHandle;
+ private IFileHandle fileHandle;
- private IFileHandle readFileHandle;
+ private final AtomicInteger referenceCount = new AtomicInteger(0);
private long size;
@@ -86,12 +87,13 @@
localPageList = new ArrayList<>();
fileRef = null;
- writeFileHandle = null;
+ fileHandle = null;
}
public synchronized void open() {
size = 0;
persistentSize = 0;
+ referenceCount.set(0);
}
public synchronized void close() {
@@ -112,25 +114,29 @@
}
private void closeWriteFileHandle() {
- if (writeFileHandle != null) {
+ if (fileHandle != null) {
+ doCloseFileHandle();
+ }
+ }
+
+ private void doCloseFileHandle() {
+ if (referenceCount.decrementAndGet() == 0) {
+ // close the file if there is no more reference
try {
- ioManager.close(writeFileHandle);
+ ioManager.close(fileHandle);
} catch (IOException e) {
// Since file handle could not be closed, just ignore.
}
- writeFileHandle = null;
+ fileHandle = null;
}
}
public synchronized void write(ByteBuffer buffer) throws
HyracksDataException {
if (fileRef == null) {
- String fName = FILE_PREFIX +
String.valueOf(resultSetPartitionId.getPartition());
- fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
- writeFileHandle = ioManager.open(fileRef,
IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_SYNC);
+ initWriteFileHandle();
}
- size += ioManager.syncWrite(writeFileHandle, size, buffer);
+ size += ioManager.syncWrite(fileHandle, size, buffer);
notifyAll();
}
@@ -165,9 +171,8 @@
}
public synchronized void readClose() throws HyracksDataException {
- if (readFileHandle != null) {
- ioManager.close(readFileHandle);
- readFileHandle = null;
+ if (fileHandle != null) {
+ doCloseFileHandle();
}
}
@@ -185,10 +190,10 @@
return readSize;
}
- if (readFileHandle == null) {
+ if (fileHandle == null) {
initReadFileHandle();
}
- readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+ readSize = ioManager.syncRead(fileHandle, offset, buffer);
return readSize;
}
@@ -210,10 +215,10 @@
}
if (offset < persistentSize) {
- if (readFileHandle == null) {
+ if (fileHandle == null) {
initReadFileHandle();
}
- readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+ readSize = ioManager.syncRead(fileHandle, offset, buffer);
if (readSize < 0) {
throw new HyracksDataException("Premature end of file");
}
@@ -245,21 +250,17 @@
// If we do not have any pages to be given back close the write
channel since we don't write any more, return null.
if (page == null) {
- ioManager.close(writeFileHandle);
+ ioManager.close(fileHandle);
return null;
}
page.getBuffer().flip();
if (fileRef == null) {
- String fName = FILE_PREFIX +
String.valueOf(resultSetPartitionId.getPartition());
- fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
- writeFileHandle = ioManager.open(fileRef,
IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- notifyAll();
+ initWriteFileHandle();
}
- long delta = ioManager.syncWrite(writeFileHandle, persistentSize,
page.getBuffer());
+ long delta = ioManager.syncWrite(fileHandle, persistentSize,
page.getBuffer());
persistentSize += delta;
return page;
}
@@ -325,8 +326,23 @@
return page;
}
+ private void initWriteFileHandle() throws HyracksDataException {
+ if (fileHandle == null) {
+ String fName = FILE_PREFIX +
String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+ fileHandle = ioManager.open(fileRef,
IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ if (referenceCount.get() != 0) {
+ throw new IllegalStateException("Illegal reference count " +
referenceCount.get());
+ }
+ referenceCount.set(1);
+ notifyAll();
+ }
+ }
+
private void initReadFileHandle() throws HyracksDataException {
- while (fileRef == null && !failed.get()) {
+ while (fileHandle == null && !failed.get()) {
+ // wait for writer to create fileHandle
try {
wait();
} catch (InterruptedException e) {
@@ -335,10 +351,9 @@
}
if (failed.get()) {
return;
+ } else {
+ referenceCount.incrementAndGet();
}
-
- readFileHandle = ioManager.open(fileRef,
IIOManager.FileReadWriteMode.READ_ONLY,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
}
@Override
--
To view, visit https://asterix-gerrit.ics.uci.edu/2560
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia053f8e258759881583deb509425b585bc3c4bf2
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <[email protected]>