Repository: drill
Updated Branches:
  refs/heads/master d105950a7 -> e1649dd7d


DRILL-5737: Hash Agg uses more than the allocated memory under certain low 
memory conditions Note: Provide a new config parameter HASHAGG_FALLBACK_ENABLED 
which is set to true by default. When 2 Phase HashAgg doesn't have enough 
memory to hold 2 partitions then based on this flag it either fallsback to old 
behavior of consuming unbounded memory or it fails the query.

close apache/drill#920


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e1649dd7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e1649dd7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e1649dd7

Branch: refs/heads/master
Commit: e1649dd7d9fb2c30632f4df6ea17c483379c9775
Parents: d105950
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Tue Aug 22 18:20:51 2017 -0700
Committer: Aman Sinha <asi...@maprtech.com>
Committed: Mon Sep 4 23:29:28 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  2 +
 .../impl/aggregate/HashAggTemplate.java         | 19 +++++-
 .../server/options/SystemOptionManager.java     |  1 +
 .../src/main/resources/drill-module.conf        |  6 ++
 .../physical/impl/agg/TestHashAggrSpill.java    | 66 ++++++++++++++++++++
 5 files changed, 91 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 95ee00e..4aaa537 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -106,6 +106,8 @@ public interface ExecConstants {
   LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new 
RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5);
   String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
   String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
+  String HASHAGG_FALLBACK_ENABLED_KEY = "drill.exec.hashagg.fallback.enabled";
+  BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new 
BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY);
 
   String TEXT_LINE_READER_BATCH_SIZE = 
"drill.exec.storage.file.text.batch.size";
   String TEXT_LINE_READER_BUFFER_SIZE = 
"drill.exec.storage.file.text.buffer.size";

http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 9f2c2fa..a3b1ceb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -379,11 +379,13 @@ public abstract class HashAggTemplate implements 
HashAggregator {
    */
   private void delayedSetup() {
 
+    final boolean fallbackEnabled = 
context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
+
     // Set the number of partitions from the configuration (raise to a power 
of two, if needed)
     numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
     if ( numPartitions == 1 ) {
       canSpill = false;
-      logger.warn("Spilling was disabled due to configuration setting of 
num_partitions to 1");
+      logger.warn("Spilling is disabled due to configuration setting of 
num_partitions to 1");
     }
     numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case 
not a power of 2
 
@@ -401,9 +403,20 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 
1024 * 1024) > memAvail ) {
         numPartitions /= 2;
         if ( numPartitions < 2) {
-          if ( is2ndPhase ) {
+          if (is2ndPhase) {
             canSpill = false;  // 2nd phase needs at least 2 to make progress
-            logger.warn("Spilling was disabled - not enough memory available 
for internal partitioning");
+
+            if (fallbackEnabled) {
+              logger.warn("Spilling is disabled - not enough memory available 
for internal partitioning. Falling back"
+                  + " to use unbounded memory");
+            } else {
+              throw UserException.resourceError()
+                  .message(String.format("Not enough memory for internal 
partitioning and fallback mechanism for "
+                      + "HashAgg to use unbounded memory is disabled. Either 
enable fallback config %s using Alter "
+                      + "session/system command or increase memory limit for 
Drillbit",
+                      ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY))
+                  .build(logger);
+            }
           }
           break;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index d2dfc2a..4e362ff 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -118,6 +118,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements OptionMana
       ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR,
       ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR,
       ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning
+      ExecConstants.HASHAGG_FALLBACK_ENABLED_VALIDATOR, // for enable/disable 
unbounded HashAgg
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index f387cda..26f0722 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -370,6 +370,12 @@ drill.exec.options:  {
     debug.validate_vectors :false,
     drill.exec.functions.cast_empty_string_to_null: false,
     drill.exec.hashagg.min_batches_per_partition : 3,
+    # Setting to control if HashAgg should fallback to older behavior of 
consuming
+    # unbounded memory. In case of 2 phase Agg when available memory is not 
enough
+    # to start at least 2 partitions then HashAgg fallbacks to this case. It 
can be
+    # enabled by setting this flag to true. By default it's set to false such 
that
+    # query will fail if there is not enough memory
+    drill.exec.hashagg.fallback.enabled: false,
     drill.exec.storage.file.partition.column.label: "dir",
     drill.exec.storage.implicit.filename.column.label: "filename",
     drill.exec.storage.implicit.filepath.column.label: "filepath",

http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index fe6fcbc..fb43f3e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.agg;
 
 import ch.qos.logback.classic.Level;
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -34,6 +35,7 @@ import org.junit.Test;
 
 import java.util.List;
 
+import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -138,4 +140,68 @@ public class TestHashAggrSpill extends BaseTestQuery {
             runAndDump(client, sql, 1_100_000, 3, 2);
         }
     }
+
+    /**
+     * Test when memory limit is set to very low value even to hold one 
partition in 2 Phase Hash Agg scenario and
+     * fallback mechanism to use unbounded memory is disabled then Query Fails 
in HashAgg with Resource Error.
+     * @throws Exception
+     */
+    @Test
+    public void testHashAggrFailWithFallbackDisabed() throws Exception {
+        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+            .toConsole()
+            .logger("org.apache.drill.exec.physical.impl.aggregate", 
Level.DEBUG)
+            ;
+
+        FixtureBuilder builder = ClusterFixture.builder()
+            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
+            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
+            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+            .maxParallelization(2)
+            .saveProfiles()
+            ;
+        try (LogFixture logs = logBuilder.build();
+             ClusterFixture cluster = builder.build();
+             ClientFixture client = cluster.clientFixture()) {
+            String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) 
FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
+            QueryBuilder.QuerySummary summary = 
client.queryBuilder().sql(sql).run();
+            fail();
+        } catch (Exception ex) {
+            assertTrue(ex instanceof UserRemoteException);
+            assertTrue(((UserRemoteException)ex).getErrorType() == 
UserBitShared.DrillPBError.ErrorType.RESOURCE);
+        }
+    }
+
+    /**
+     * Test when memory limit is set to very low value even to hold one 
partition in 2 Phase Hash Agg scenario and
+     * fallback mechanism to use unbounded memory is enabled then query 
completes successfully without spilling.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
+        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+            .toConsole()
+            .logger("org.apache.drill.exec.physical.impl.aggregate", 
Level.DEBUG)
+            ;
+
+        FixtureBuilder builder = ClusterFixture.builder()
+            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
+            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
+            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+            .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, true)
+            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+            .maxParallelization(2)
+            .saveProfiles()
+            ;
+        try (LogFixture logs = logBuilder.build();
+             ClusterFixture cluster = builder.build();
+             ClientFixture client = cluster.clientFixture()) {
+            String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) 
FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
+            runAndDump(client, sql, 1_200_000, 0, 0);
+        } catch (Exception ex) {
+            fail();
+        }
+    }
 }
\ No newline at end of file

Reply via email to