DRILL-3993: Add check for OOM in HashAgg
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d06a7cbb Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d06a7cbb Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d06a7cbb Branch: refs/heads/master Commit: d06a7cbbd7e5046e5017ffeb010b8b3c74123184 Parents: 3c9093e Author: Volodymyr Vysotskyi <vvo...@gmail.com> Authored: Fri Dec 29 17:36:08 2017 +0200 Committer: Volodymyr Vysotskyi <vvo...@gmail.com> Committed: Tue Jan 16 12:10:13 2018 +0200 ---------------------------------------------------------------------- .../physical/impl/aggregate/HashAggTemplate.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/d06a7cbb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 4a81f3c..4b43b22 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -1280,6 +1280,8 @@ public abstract class HashAggTemplate implements HashAggregator { logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.", is1stPhase ? "early return" : "spill"); + checkForSpillPossibility(currentPartition); + doSpill(currentPartition); // spill to free some memory retrySameIndex = true; @@ -1303,6 +1305,8 @@ public abstract class HashAggTemplate implements HashAggregator { long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput; if ( memDiff > 0 ) { logger.warn("Leak: HashTable put() OOM left behind {} bytes allocated",memDiff); } + checkForSpillPossibility(currentPartition); + doSpill(currentPartition); // spill to free some memory retrySameIndex = true; @@ -1379,6 +1383,17 @@ public abstract class HashAggTemplate implements HashAggregator { } } + /** + * Checks that spill is possible, otherwise throws {@link OutOfMemoryException}. + * + * @param currentPartition the partition that hit the memory limit + */ + private void checkForSpillPossibility(int currentPartition) { + if (chooseAPartitionToFlush(currentPartition, true) < 0) { + throw new OutOfMemoryException(getOOMErrorMsg("AGGR")); + } + } + private void spillIfNeeded(int currentPartition) { spillIfNeeded(currentPartition, false);} private void doSpill(int currentPartition) { spillIfNeeded(currentPartition, true);} /**