Repository: drill Updated Branches: refs/heads/master d105950a7 -> e1649dd7d
DRILL-5737: Hash Agg uses more than the allocated memory under certain low memory conditions Note: Provide a new config parameter HASHAGG_FALLBACK_ENABLED which is set to true by default. When 2 Phase HashAgg doesn't have enough memory to hold 2 partitions then based on this flag it either fallsback to old behavior of consuming unbounded memory or it fails the query. close apache/drill#920 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e1649dd7 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e1649dd7 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e1649dd7 Branch: refs/heads/master Commit: e1649dd7d9fb2c30632f4df6ea17c483379c9775 Parents: d105950 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> Authored: Tue Aug 22 18:20:51 2017 -0700 Committer: Aman Sinha <asi...@maprtech.com> Committed: Mon Sep 4 23:29:28 2017 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 2 + .../impl/aggregate/HashAggTemplate.java | 19 +++++- .../server/options/SystemOptionManager.java | 1 + .../src/main/resources/drill-module.conf | 6 ++ .../physical/impl/agg/TestHashAggrSpill.java | 66 ++++++++++++++++++++ 5 files changed, 91 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 95ee00e..4aaa537 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -106,6 +106,8 @@ public interface ExecConstants { LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5); String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories"; String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs"; + String HASHAGG_FALLBACK_ENABLED_KEY = "drill.exec.hashagg.fallback.enabled"; + BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY); String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size"; String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size"; http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 9f2c2fa..a3b1ceb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -379,11 +379,13 @@ public abstract class HashAggTemplate implements HashAggregator { */ private void delayedSetup() { + final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val; + // Set the number of partitions from the configuration (raise to a power of two, if needed) numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS); if ( numPartitions == 1 ) { canSpill = false; - logger.warn("Spilling was disabled due to configuration setting of num_partitions to 1"); + logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1"); } numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 @@ -401,9 +403,20 @@ public abstract class HashAggTemplate implements HashAggregator { while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 1024 * 1024) > memAvail ) { numPartitions /= 2; if ( numPartitions < 2) { - if ( is2ndPhase ) { + if (is2ndPhase) { canSpill = false; // 2nd phase needs at least 2 to make progress - logger.warn("Spilling was disabled - not enough memory available for internal partitioning"); + + if (fallbackEnabled) { + logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" + + " to use unbounded memory"); + } else { + throw UserException.resourceError() + .message(String.format("Not enough memory for internal partitioning and fallback mechanism for " + + "HashAgg to use unbounded memory is disabled. Either enable fallback config %s using Alter " + + "session/system command or increase memory limit for Drillbit", + ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY)) + .build(logger); + } } break; } http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index d2dfc2a..4e362ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -118,6 +118,7 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR, ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR, ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning + ExecConstants.HASHAGG_FALLBACK_ENABLED_VALIDATOR, // for enable/disable unbounded HashAgg ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index f387cda..26f0722 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -370,6 +370,12 @@ drill.exec.options: { debug.validate_vectors :false, drill.exec.functions.cast_empty_string_to_null: false, drill.exec.hashagg.min_batches_per_partition : 3, + # Setting to control if HashAgg should fallback to older behavior of consuming + # unbounded memory. In case of 2 phase Agg when available memory is not enough + # to start at least 2 partitions then HashAgg fallbacks to this case. It can be + # enabled by setting this flag to true. By default it's set to false such that + # query will fail if there is not enough memory + drill.exec.hashagg.fallback.enabled: false, drill.exec.storage.file.partition.column.label: "dir", drill.exec.storage.implicit.filename.column.label: "filename", drill.exec.storage.implicit.filepath.column.label: "filepath", http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java index fe6fcbc..fb43f3e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.agg; import ch.qos.logback.classic.Level; import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate; import org.apache.drill.exec.planner.physical.PlannerSettings; @@ -34,6 +35,7 @@ import org.junit.Test; import java.util.List; +import static junit.framework.TestCase.fail; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -138,4 +140,68 @@ public class TestHashAggrSpill extends BaseTestQuery { runAndDump(client, sql, 1_100_000, 3, 2); } } + + /** + * Test when memory limit is set to very low value even to hold one partition in 2 Phase Hash Agg scenario and + * fallback mechanism to use unbounded memory is disabled then Query Fails in HashAgg with Resource Error. + * @throws Exception + */ + @Test + public void testHashAggrFailWithFallbackDisabed() throws Exception { + LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() + .toConsole() + .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG) + ; + + FixtureBuilder builder = ClusterFixture.builder() + .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000) + .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4) + .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3) + .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true) + .maxParallelization(2) + .saveProfiles() + ; + try (LogFixture logs = logBuilder.build(); + ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i"; + QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run(); + fail(); + } catch (Exception ex) { + assertTrue(ex instanceof UserRemoteException); + assertTrue(((UserRemoteException)ex).getErrorType() == UserBitShared.DrillPBError.ErrorType.RESOURCE); + } + } + + /** + * Test when memory limit is set to very low value even to hold one partition in 2 Phase Hash Agg scenario and + * fallback mechanism to use unbounded memory is enabled then query completes successfully without spilling. + * + * @throws Exception + */ + @Test + public void testHashAggrSuccessWithFallbackEnabled() throws Exception { + LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() + .toConsole() + .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG) + ; + + FixtureBuilder builder = ClusterFixture.builder() + .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000) + .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4) + .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3) + .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, true) + .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true) + .maxParallelization(2) + .saveProfiles() + ; + try (LogFixture logs = logBuilder.build(); + ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i"; + runAndDump(client, sql, 1_200_000, 0, 0); + } catch (Exception ex) { + fail(); + } + } } \ No newline at end of file