Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1513
Change subject: ASTERIXDB-1791: fix failure hangling in the hash join operator.
......................................................................
ASTERIXDB-1791: fix failure hangling in the hash join operator.
This change includes the following parts:
- Fix the implementation of fail() and close() in the join probe activity
so as to avoid file leakage and hanging job;
- Fix OptimizedHybridHashJoin to close files before deleting them in order
to make the disk space available;
- Fix RunFileReader to not truncate files to be deleted - it is not the root
cause of un-released disk space - open deleted files are the root cuase;
- Fix NCService to reserve RAM for the OS when automatically setting -Xmx
for NCDriver.
Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3
---
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
5 files changed, 39 insertions(+), 47 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/13/1513/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index f0e7f0e..33b8980 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -72,16 +72,11 @@
}
public void close() throws IOException {
- channel.close();
raf.close();
}
public FileReference getFileReference() {
return fileRef;
- }
-
- public RandomAccessFile getRandomAccessFile() {
- return raf;
}
public FileChannel getFileChannel() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
index 5a03d3c..de6e50f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -121,7 +121,8 @@
LOGGER.info("Using JAVA_OPTS from environment");
} else {
LOGGER.info("Using default JAVA_OPTS");
- long ramSize = ((com.sun.management.OperatingSystemMXBean)
osMXBean).getTotalPhysicalMemorySize();
+ long ramSize = ((com.sun.management.OperatingSystemMXBean)
osMXBean).getTotalPhysicalMemorySize()
+ - 1024 * 1024 * 100;
jvmargs = "-Xmx" + (int) Math.ceil(0.6 * ramSize / (1024 *
1024)) + "m";
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index b69f377..8fae568 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -29,7 +29,6 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.control.nc.io.FileHandle;
public class RunFileReader implements IFrameReader {
private final FileReference file;
@@ -49,7 +48,7 @@
@Override
public void open() throws HyracksDataException {
// Opens RW mode because we need to truncate the given file if
required.
- handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE,
+ handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
readPtr = 0;
}
@@ -87,8 +86,6 @@
public void close() throws HyracksDataException {
if (deleteAfterClose) {
try {
- // Truncates the file size to zero since OS might be keeping
the file for a while.
- ((FileHandle) handle).getFileChannel().truncate(0);
ioManager.close(handle);
FileUtils.deleteQuietly(file.getFile());
} catch (IOException e) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 17f009e..0746b80 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -553,9 +553,10 @@
/**
* In case of failure happens, we need to clear up the generated temporary
files.
*/
- public void clearProbeTempFiles() {
+ public void clearProbeTempFiles() throws HyracksDataException {
for (int i = 0; i < probeRFWriters.length; i++) {
if (probeRFWriters[i] != null) {
+ probeRFWriters[i].close(); // Closes files first.
probeRFWriters[i].getFileReference().delete();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index a72c0c6..4f984d3 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -385,6 +385,7 @@
private FrameTupleAppender nullResultAppender = null;
private FrameTupleAccessor probeTupleAccessor;
+ private boolean failed = false;
@Override
public void open() throws HyracksDataException {
@@ -406,37 +407,46 @@
@Override
public void fail() throws HyracksDataException {
- state.hybridHJ.clearProbeTempFiles();
+ failed = true;
writer.fail();
}
@Override
public void close() throws HyracksDataException {
- try {
- state.hybridHJ.closeProbe(writer);
-
- BitSet partitionStatus =
state.hybridHJ.getPartitionStatus();
-
- rPartbuff.reset();
- for (int pid = partitionStatus.nextSetBit(0); pid >=
0; pid = partitionStatus
- .nextSetBit(pid + 1)) {
-
- RunFileReader bReader =
state.hybridHJ.getBuildRFReader(pid);
- RunFileReader pReader =
state.hybridHJ.getProbeRFReader(pid);
-
- if (bReader == null || pReader == null) {
- if (isLeftOuter && pReader != null) {
- appendNullToProbeTuples(pReader);
- }
- continue;
- }
- int bSize =
state.hybridHJ.getBuildPartitionSizeInTup(pid);
- int pSize =
state.hybridHJ.getProbePartitionSizeInTup(pid);
- joinPartitionPair(bReader, pReader, bSize, pSize,
1);
+ if (failed) {
+ try {
+ state.hybridHJ.clearProbeTempFiles(); // Clear
temp files if fail is called.
+ } finally {
+ writer.close(); // writer should always be closed.
}
+ } else {
+ try {
+ state.hybridHJ.closeProbe(writer);
+ BitSet partitionStatus =
state.hybridHJ.getPartitionStatus();
+ rPartbuff.reset();
+ for (int pid = partitionStatus.nextSetBit(0); pid
>= 0; pid = partitionStatus
+ .nextSetBit(pid + 1)) {
+ RunFileReader bReader =
state.hybridHJ.getBuildRFReader(pid);
+ RunFileReader pReader =
state.hybridHJ.getProbeRFReader(pid);
- } finally {
- writer.close();
+ if (bReader == null || pReader == null) {
+ if (isLeftOuter && pReader != null) {
+ appendNullToProbeTuples(pReader);
+ }
+ continue;
+ }
+ int bSize =
state.hybridHJ.getBuildPartitionSizeInTup(pid);
+ int pSize =
state.hybridHJ.getProbePartitionSizeInTup(pid);
+ joinPartitionPair(bReader, pReader, bSize,
pSize, 1);
+ }
+ } catch (Exception e) {
+ // Since writer.nextFrame() is called in the above
"try" body, we have to call writer.fail()
+ // to propagate failure signal to the writer, if
there is an exception thrown.
+ writer.fail();
+ throw e;
+ } finally {
+ writer.close();
+ }
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("OptimizedHybridHashJoin closed its probe
phase");
@@ -738,18 +748,6 @@
};
return op;
}
- }
-
- public void setSkipInMemHJ(boolean b) {
- skipInMemoryHJ = b;
- }
-
- public void setForceNLJ(boolean b) {
- forceNLJ = b;
- }
-
- public void setForceRR(boolean b) {
- forceRoleReversal = !isLeftOuter && b;
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1513
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>