Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1513

Change subject: ASTERIXDB-1791: fix failure hangling in the hash join operator.
......................................................................

ASTERIXDB-1791: fix failure hangling in the hash join operator.

This change includes the following parts:
- Fix the implementation of fail() and close() in the join probe activity
  so as to avoid file leakage and hanging job;
- Fix OptimizedHybridHashJoin to close files before deleting them in order
  to make the disk space available;
- Fix RunFileReader to not truncate files to be deleted - it is not the root
  cause of un-released disk space - open deleted files are the root cuase;
- Fix NCService to reserve RAM for the OS when automatically setting -Xmx
  for NCDriver.

Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
5 files changed, 39 insertions(+), 47 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/13/1513/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index f0e7f0e..33b8980 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -72,16 +72,11 @@
     }
 
     public void close() throws IOException {
-        channel.close();
         raf.close();
     }
 
     public FileReference getFileReference() {
         return fileRef;
-    }
-
-    public RandomAccessFile getRandomAccessFile() {
-        return raf;
     }
 
     public FileChannel getFileChannel() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
index 5a03d3c..de6e50f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -121,7 +121,8 @@
                 LOGGER.info("Using JAVA_OPTS from environment");
             } else {
                 LOGGER.info("Using default JAVA_OPTS");
-                long ramSize = ((com.sun.management.OperatingSystemMXBean) 
osMXBean).getTotalPhysicalMemorySize();
+                long ramSize = ((com.sun.management.OperatingSystemMXBean) 
osMXBean).getTotalPhysicalMemorySize()
+                        - 1024 * 1024 * 100;
                 jvmargs = "-Xmx" + (int) Math.ceil(0.6 * ramSize / (1024 * 
1024)) + "m";
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index b69f377..8fae568 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -29,7 +29,6 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.control.nc.io.FileHandle;
 
 public class RunFileReader implements IFrameReader {
     private final FileReference file;
@@ -49,7 +48,7 @@
     @Override
     public void open() throws HyracksDataException {
         // Opens RW mode because we need to truncate the given file if 
required.
-        handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE,
+        handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         readPtr = 0;
     }
@@ -87,8 +86,6 @@
     public void close() throws HyracksDataException {
         if (deleteAfterClose) {
             try {
-                // Truncates the file size to zero since OS might be keeping 
the file for a while.
-                ((FileHandle) handle).getFileChannel().truncate(0);
                 ioManager.close(handle);
                 FileUtils.deleteQuietly(file.getFile());
             } catch (IOException e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 17f009e..0746b80 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -553,9 +553,10 @@
     /**
      * In case of failure happens, we need to clear up the generated temporary 
files.
      */
-    public void clearProbeTempFiles() {
+    public void clearProbeTempFiles() throws HyracksDataException {
         for (int i = 0; i < probeRFWriters.length; i++) {
             if (probeRFWriters[i] != null) {
+                probeRFWriters[i].close(); // Closes files first.
                 probeRFWriters[i].getFileReference().delete();
             }
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index a72c0c6..4f984d3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -385,6 +385,7 @@
 
                 private FrameTupleAppender nullResultAppender = null;
                 private FrameTupleAccessor probeTupleAccessor;
+                private boolean failed = false;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -406,37 +407,46 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
-                    state.hybridHJ.clearProbeTempFiles();
+                    failed = true;
                     writer.fail();
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    try {
-                        state.hybridHJ.closeProbe(writer);
-
-                        BitSet partitionStatus = 
state.hybridHJ.getPartitionStatus();
-
-                        rPartbuff.reset();
-                        for (int pid = partitionStatus.nextSetBit(0); pid >= 
0; pid = partitionStatus
-                                .nextSetBit(pid + 1)) {
-
-                            RunFileReader bReader = 
state.hybridHJ.getBuildRFReader(pid);
-                            RunFileReader pReader = 
state.hybridHJ.getProbeRFReader(pid);
-
-                            if (bReader == null || pReader == null) {
-                                if (isLeftOuter && pReader != null) {
-                                    appendNullToProbeTuples(pReader);
-                                }
-                                continue;
-                            }
-                            int bSize = 
state.hybridHJ.getBuildPartitionSizeInTup(pid);
-                            int pSize = 
state.hybridHJ.getProbePartitionSizeInTup(pid);
-                            joinPartitionPair(bReader, pReader, bSize, pSize, 
1);
+                    if (failed) {
+                        try {
+                            state.hybridHJ.clearProbeTempFiles(); // Clear 
temp files if fail is called.
+                        } finally {
+                            writer.close(); // writer should always be closed.
                         }
+                    } else {
+                        try {
+                            state.hybridHJ.closeProbe(writer);
+                            BitSet partitionStatus = 
state.hybridHJ.getPartitionStatus();
+                            rPartbuff.reset();
+                            for (int pid = partitionStatus.nextSetBit(0); pid 
>= 0; pid = partitionStatus
+                                    .nextSetBit(pid + 1)) {
+                                RunFileReader bReader = 
state.hybridHJ.getBuildRFReader(pid);
+                                RunFileReader pReader = 
state.hybridHJ.getProbeRFReader(pid);
 
-                    } finally {
-                        writer.close();
+                                if (bReader == null || pReader == null) {
+                                    if (isLeftOuter && pReader != null) {
+                                        appendNullToProbeTuples(pReader);
+                                    }
+                                    continue;
+                                }
+                                int bSize = 
state.hybridHJ.getBuildPartitionSizeInTup(pid);
+                                int pSize = 
state.hybridHJ.getProbePartitionSizeInTup(pid);
+                                joinPartitionPair(bReader, pReader, bSize, 
pSize, 1);
+                                }
+                        } catch (Exception e) {
+                            // Since writer.nextFrame() is called in the above 
"try" body, we have to call writer.fail()
+                            // to propagate failure signal to the writer, if 
there is an exception thrown.
+                            writer.fail();
+                            throw e;
+                        } finally {
+                            writer.close();
+                        }
                     }
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("OptimizedHybridHashJoin closed its probe 
phase");
@@ -738,18 +748,6 @@
             };
             return op;
         }
-    }
-
-    public void setSkipInMemHJ(boolean b) {
-        skipInMemoryHJ = b;
-    }
-
-    public void setForceNLJ(boolean b) {
-        forceNLJ = b;
-    }
-
-    public void setForceRR(boolean b) {
-        forceRoleReversal = !isLeftOuter && b;
     }
 
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1513
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I203168171e6dac16b57d2eda960823e3810e22a3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to