Repository: impala
Updated Branches:
  refs/heads/master 93606e604 -> 80edf3701


IMPALA-7351: Improve memory estimates for Kudu Scan Nodes

This patch adds memory estimates for kudu scan nodes based on
empirically derived estimates for the scan's memory consumption
that were added in IMPALA-7096.

Testing:
Modified resource requirements planner test.

Change-Id: If9bb52530271b0bff91311a67d222a2e9fac1229
Reviewed-on: http://gerrit.cloudera.org:8080/11440
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3fabc2de
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3fabc2de
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3fabc2de

Branch: refs/heads/master
Commit: 3fabc2de4771349079bcd9dc8bdcb267f43b2a6b
Parents: 93606e6
Author: Bikramjeet Vig <bikramjeet....@cloudera.com>
Authored: Tue Aug 28 14:39:16 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Thu Oct 4 22:04:02 2018 +0000

----------------------------------------------------------------------
 be/src/util/backend-gflag-util.cc               |  7 ++
 common/thrift/BackendGflags.thrift              |  4 ++
 .../org/apache/impala/planner/HdfsScanNode.java | 22 +-----
 .../org/apache/impala/planner/KuduScanNode.java | 25 ++++++-
 .../org/apache/impala/planner/ScanNode.java     | 38 ++++++++++
 .../queries/PlannerTest/kudu-selectivity.test   | 52 +++++++-------
 .../PlannerTest/min-max-runtime-filters.test    | 24 +++----
 .../PlannerTest/resource-requirements.test      | 74 ++++++++++++++++++++
 .../queries/PlannerTest/tpch-kudu.test          | 38 +++++-----
 9 files changed, 205 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 50d72bd..09975a8 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -63,6 +63,9 @@ DECLARE_bool(invalidate_tables_on_memory_pressure);
 DECLARE_double(invalidate_tables_gc_old_gen_full_threshold);
 DECLARE_double(invalidate_tables_fraction_on_memory_pressure);
 DECLARE_int32(local_catalog_max_fetch_retries);
+DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column);
+DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
+
 namespace impala {
 
 Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
@@ -119,6 +122,10 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
   cfg.__set_invalidate_tables_fraction_on_memory_pressure(
       FLAGS_invalidate_tables_fraction_on_memory_pressure);
   
cfg.__set_local_catalog_max_fetch_retries(FLAGS_local_catalog_max_fetch_retries);
+  cfg.__set_kudu_scanner_thread_estimated_bytes_per_column(
+      FLAGS_kudu_scanner_thread_estimated_bytes_per_column);
+  cfg.__set_kudu_scanner_thread_max_estimated_bytes(
+      FLAGS_kudu_scanner_thread_max_estimated_bytes);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 10003f5..5f971c0 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -99,4 +99,8 @@ struct TBackendGflags {
   36: required double invalidate_tables_fraction_on_memory_pressure
 
   37: required i32 local_catalog_max_fetch_retries
+
+  38: required i64 kudu_scanner_thread_estimated_bytes_per_column
+
+  39: required i64 kudu_scanner_thread_max_estimated_bytes
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 8c10e5f..63c4307 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -140,11 +140,6 @@ public class HdfsScanNode extends ScanNode {
   // threads. Corresponds to the default value of --num_threads_per_core in 
the backend.
   private static final int MAX_THREAD_TOKENS_PER_CORE = 3;
 
-  // Factor capturing the worst-case deviation from a uniform distribution of 
scan ranges
-  // among nodes. The factor of 1.2 means that a particular node may have 20% 
more
-  // scan ranges than would have been estimated assuming a uniform 
distribution.
-  private static final double SCAN_RANGE_SKEW_FACTOR = 1.2;
-
   // The minimum amount of memory we estimate a scan will use. The number is
   // derived experimentally: running metadata-only Parquet count(*) scans on 
TPC-H
   // lineitem and TPC-DS store_sales of different sizes resulted in memory 
consumption
@@ -1376,8 +1371,7 @@ public class HdfsScanNode extends ScanNode {
         // excluding partition columns and columns that are populated from 
file metadata.
         partitionScanRange = columnReservations.size();
       } else {
-        partitionScanRange = (int) Math.ceil(
-            ((double) scanRangeSize / (double) numNodes_) * 
SCAN_RANGE_SKEW_FACTOR);
+        partitionScanRange = estimatePerHostScanRanges(scanRangeSize);
       }
       // From the resource management purview, we want to conservatively 
estimate memory
       // consumption based on the partition with the highest memory 
requirements.
@@ -1388,18 +1382,8 @@ public class HdfsScanNode extends ScanNode {
 
     // The non-MT scan node requires at least one scanner thread.
     int requiredThreads = useMtScanNode_ ? 0 : 1;
-    int maxScannerThreads;
-    if (queryOptions.getMt_dop() >= 1) {
-      maxScannerThreads = 1;
-    } else {
-      maxScannerThreads = Math.min(perHostScanRanges, 
RuntimeEnv.INSTANCE.getNumCores());
-      // Account for the max scanner threads query option.
-      if (queryOptions.isSetNum_scanner_threads() &&
-          queryOptions.getNum_scanner_threads() > 0) {
-        maxScannerThreads =
-            Math.min(maxScannerThreads, queryOptions.getNum_scanner_threads());
-      }
-    }
+    int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions,
+        perHostScanRanges);
 
     long avgScanRangeBytes = (long) Math.ceil(totalBytes_ / (double) 
scanRangeSize);
     // The +1 accounts for an extra I/O buffer to read past the scan range due 
to a

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 95b31a2..56c7602 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -39,6 +39,7 @@ import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TKuduScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -225,7 +226,7 @@ public class KuduScanNode extends ScanNode {
 
       TScanRangeLocationList locs = new TScanRangeLocationList();
       locs.setScan_range(scanRange);
-      locs.locations = locations;
+      locs.setLocations(locations);
       scanRangeSpecs_.addToConcrete_ranges(locs);
     }
   }
@@ -272,9 +273,27 @@ public class KuduScanNode extends ScanNode {
 
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
-    // TODO: add a memory estimate when we revisit memory estimates overall.
+    // The bulk of memory used by Kudu scan node is generally utilized by the
+    // RowbatchQueue plus the row batches filled in by the scanner threads and
+    // waiting to be queued into the RowbatchQueue. Due to a number of factors
+    // like variable length string columns, mem pool usage pattern,
+    // variability of the number of scanner threads being spawned and the
+    // variability of the average RowbatchQueue size, it is increasingly
+    // difficult to precisely estimate the memory usage. Therefore, we fall 
back
+    // to a more simpler approach of using empirically derived estimates.
+    int numOfScanRanges = scanRangeSpecs_.getConcrete_rangesSize();
+    int perHostScanRanges = estimatePerHostScanRanges(numOfScanRanges);
+    int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions,
+        perHostScanRanges);
+    int num_cols = desc_.getSlots().size();
+    long estimated_bytes_per_column_per_thread = 
BackendConfig.INSTANCE.getBackendCfg().
+        kudu_scanner_thread_estimated_bytes_per_column;
+    long max_estimated_bytes_per_thread = 
BackendConfig.INSTANCE.getBackendCfg().
+        kudu_scanner_thread_max_estimated_bytes;
+    long mem_estimate_per_thread = Math.min(num_cols *
+        estimated_bytes_per_column_per_thread, max_estimated_bytes_per_thread);
     nodeResourceProfile_ = new ResourceProfileBuilder()
-        .setMemEstimateBytes(0)
+        .setMemEstimateBytes(mem_estimate_per_thread * maxScannerThreads)
         .setThreadReservation(useMtScanNode_ ? 0 : 1).build();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index f2daef5..76e920b 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -25,7 +25,9 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableStats;
 
@@ -38,6 +40,13 @@ import com.google.common.collect.Lists;
  * Representation of the common elements of all scan nodes.
  */
 abstract public class ScanNode extends PlanNode {
+
+  // Factor capturing the worst-case deviation from a uniform distribution of 
scan ranges
+  // among nodes. The factor of 1.2 means that a particular node may have 20% 
more
+  // scan ranges than would have been estimated assuming a uniform 
distribution.
+  // Used for HDFS and Kudu Scan node estimations.
+  protected static final double SCAN_RANGE_SKEW_FACTOR = 1.2;
+
   protected final TupleDescriptor desc_;
 
   // Total number of rows this node is expected to process
@@ -217,6 +226,35 @@ abstract public class ScanNode extends PlanNode {
   }
 
   /**
+   * Helper function that returns the estimated number of scan ranges that 
would
+   * be assigned to each host based on the total number of scan ranges.
+   */
+  protected int estimatePerHostScanRanges(long totalNumOfScanRanges) {
+    return (int) Math.ceil(((double) totalNumOfScanRanges / (double) 
numNodes_) *
+        SCAN_RANGE_SKEW_FACTOR);
+  }
+
+  /**
+   * Helper function that returns the max number of scanner threads that can be
+   * spawned by a scan node.
+   */
+  protected int computeMaxNumberOfScannerThreads(TQueryOptions queryOptions,
+      int perHostScanRanges) {
+    // The non-MT scan node requires at least one scanner thread.
+    if (queryOptions.getMt_dop() >= 1) {
+      return 1;
+    }
+    int maxScannerThreads = Math.min(perHostScanRanges,
+        RuntimeEnv.INSTANCE.getNumCores());
+    // Account for the max scanner threads query option.
+    if (queryOptions.isSetNum_scanner_threads() &&
+        queryOptions.getNum_scanner_threads() > 0) {
+      maxScannerThreads = Math.min(maxScannerThreads,
+          queryOptions.getNum_scanner_threads());
+    }
+    return maxScannerThreads;
+  }
+  /**
    * Returns true if this node has conjuncts to be evaluated by Impala against 
the scan
    * tuple.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index e8c91b0..8a59968 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -1,13 +1,13 @@
 select * from functional_kudu.zipcode_incomes where id = '8600000US00601'
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: id = '8600000US00601'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=1.88MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=1
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
@@ -22,12 +22,12 @@ Per-Host Resources: mem-estimate=0B mem-reservation=0B 
thread-reservation=1
      in pipelines: 00(GETNEXT)
 
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: id = '8600000US00601'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=1.88MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=1
      in pipelines: 00(GETNEXT)
 ====
@@ -35,14 +35,14 @@ Per-Host Resources: mem-estimate=0B mem-reservation=0B 
thread-reservation=2
 select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2'
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id != '1'
      kudu predicates: zip = '2'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=1
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
@@ -57,26 +57,26 @@ Per-Host Resources: mem-estimate=0B mem-reservation=0B 
thread-reservation=1
      in pipelines: 00(GETNEXT)
 
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id != '1'
      kudu predicates: zip = '2'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=1
      in pipelines: 00(GETNEXT)
 ====
 select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2'
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: zip > '2', id > '1'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=3317
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
@@ -91,25 +91,25 @@ Per-Host Resources: mem-estimate=0B mem-reservation=0B 
thread-reservation=1
      in pipelines: 00(GETNEXT)
 
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: zip > '2', id > '1'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=3317
      in pipelines: 00(GETNEXT)
 ====
 select * from functional_kudu.zipcode_incomes where id = '1' or id = '2' or 
zip = '3'
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id IN ('1', '2') OR zip = '3'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=3
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
@@ -124,12 +124,12 @@ Per-Host Resources: mem-estimate=0B mem-reservation=0B 
thread-reservation=1
      in pipelines: 00(GETNEXT)
 
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id IN ('1', '2') OR zip = '3'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=3
      in pipelines: 00(GETNEXT)
 ====
@@ -159,14 +159,14 @@ string_col not in ("bar") and
 id in (int_col)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=9.75MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.alltypes]
      predicates: id IN (int_col), bigint_col IN (9999999999999999999), 
double_col IN (CAST('inf' AS DOUBLE)), float_col IN (CAST('NaN' AS FLOAT)), 
int_col IN (9999999999), smallint_col IN (99999, 2), tinyint_col IN (1, 999), 
bool_col IN (1), string_col NOT IN ('bar')
      kudu predicates: double_col IN (0.0), float_col IN (0.0), bigint_col IN 
(1, 2), int_col IN (1, 2), smallint_col IN (0, 2), string_col IN ('foo', 'foo   
    '), tinyint_col IN (1, 2), bool_col IN (TRUE)
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=9.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=97B cardinality=5
      in pipelines: 00(GETNEXT)
 ====
@@ -174,13 +174,13 @@ Per-Host Resources: mem-estimate=0B mem-reservation=0B 
thread-reservation=2
 select * from functional_kudu.decimal_tbl where d1 in (1234, 12345);
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=4.50MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.decimal_tbl]
      kudu predicates: d1 IN (1234, 12345)
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=4.50MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=56B cardinality=2
      in pipelines: 00(GETNEXT)
 ====
@@ -192,14 +192,14 @@ timestamp_col > (nanoseconds_add(cast('1987-05-19 
00:00:00' as timestamp), 10))
 timestamp_col < (seconds_add(cast('9999-12-31 24:59:59' as timestamp), 10))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=9.75MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.alltypes]
      predicates: CAST(date_string_col AS TINYINT) IS NULL, timestamp_col < NULL
      kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL, 
timestamp_col > TIMESTAMP '1987-05-19 00:00:00.000000010'
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=9.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=97B cardinality=730
      in pipelines: 00(GETNEXT)
 ====
@@ -208,13 +208,13 @@ timestamp_col in (cast('2010-03-01 00:00:00' as 
timestamp),
                   cast('2010-03-01 00:01:00' as timestamp))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=9.75MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.alltypes]
      kudu predicates: timestamp_col IN (TIMESTAMP '2010-03-01 00:00:00', 
TIMESTAMP '2010-03-01 00:01:00')
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=9.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=97B cardinality=1
      in pipelines: 00(GETNEXT)
 ====
@@ -224,13 +224,13 @@ timestamp_col in (cast('2010-03-01 00:00:00' as 
timestamp),
                   cast('2010-03-01 00:01:00' as timestamp))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=2
+Per-Host Resources: mem-estimate=9.75MB mem-reservation=0B thread-reservation=2
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.alltypes]
      predicates: timestamp_col IN (TIMESTAMP '2010-03-01 00:00:00', NULL, 
TIMESTAMP '2010-03-01 00:01:00')
-     mem-estimate=0B mem-reservation=0B thread-reservation=1
+     mem-estimate=9.75MB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=97B cardinality=3
      in pipelines: 00(GETNEXT)
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
index ae8493e..0ddccdc 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
@@ -3,7 +3,7 @@ select count(*) from functional_kudu.alltypes a, 
functional_kudu.alltypestiny b
 where a.int_col = b.tinyint_col + 1 and a.string_col = b.string_col
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=11.94MB mem-reservation=1.94MB 
thread-reservation=3
+|  Per-Host Resources: mem-estimate=13.44MB mem-reservation=1.94MB 
thread-reservation=3
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
@@ -22,13 +22,13 @@ PLAN-ROOT SINK
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN KUDU [functional_kudu.alltypestiny b]
-|     mem-estimate=0B mem-reservation=0B thread-reservation=1
+|     mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
 |     tuple-ids=1 row-size=18B cardinality=8
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
    runtime filters: RF002[min_max] -> a.string_col, RF003[min_max] -> a.int_col
-   mem-estimate=0B mem-reservation=0B thread-reservation=1
+   mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=21B cardinality=7300
    in pipelines: 00(GETNEXT)
 ====
@@ -40,7 +40,7 @@ where a.int_col + 1 = b.int_col
     and a.tinyint_col is not distinct from b.tinyint_col
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=11.94MB mem-reservation=1.94MB 
thread-reservation=3
+|  Per-Host Resources: mem-estimate=14.19MB mem-reservation=1.94MB 
thread-reservation=3
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
@@ -59,12 +59,12 @@ PLAN-ROOT SINK
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN KUDU [functional_kudu.alltypestiny b]
-|     mem-estimate=0B mem-reservation=0B thread-reservation=1
+|     mem-estimate=2.25MB mem-reservation=0B thread-reservation=1
 |     tuple-ids=1 row-size=22B cardinality=8
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
-   mem-estimate=0B mem-reservation=0B thread-reservation=1
+   mem-estimate=2.25MB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=22B cardinality=7300
    in pipelines: 00(GETNEXT)
 ====
@@ -76,7 +76,7 @@ where a.tinyint_col = b.bigint_col
     and cast(a.float_col as double) = b.double_col
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=11.94MB mem-reservation=1.94MB 
thread-reservation=3
+|  Per-Host Resources: mem-estimate=14.94MB mem-reservation=1.94MB 
thread-reservation=3
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
@@ -95,13 +95,13 @@ PLAN-ROOT SINK
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN KUDU [functional_kudu.alltypestiny b]
-|     mem-estimate=0B mem-reservation=0B thread-reservation=1
+|     mem-estimate=3.00MB mem-reservation=0B thread-reservation=1
 |     tuple-ids=1 row-size=34B cardinality=8
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
    runtime filters: RF007[min_max] -> a.tinyint_col
-   mem-estimate=0B mem-reservation=0B thread-reservation=1
+   mem-estimate=3.00MB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=26B cardinality=7300
    in pipelines: 00(GETNEXT)
 ====
@@ -130,7 +130,7 @@ PLAN-ROOT SINK
 |  in pipelines: 00(GETNEXT), 02(OPEN)
 |
 |--02:SCAN KUDU [functional_kudu.alltypes c]
-|     mem-estimate=0B mem-reservation=0B thread-reservation=1
+|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
 |     tuple-ids=2 row-size=4B cardinality=7300
 |     in pipelines: 02(GETNEXT)
 |
@@ -143,7 +143,7 @@ PLAN-ROOT SINK
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN HDFS [functional_parquet.alltypes b]
-|     partitions=24/24 files=24 size=188.92KB
+|     partitions=24/24 files=24 size=199.69KB
 |     runtime filters: RF000[bloom] -> b.int_col
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
@@ -156,7 +156,7 @@ PLAN-ROOT SINK
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
    runtime filters: RF001[min_max] -> a.int_col, RF003[min_max] -> a.int_col
-   mem-estimate=0B mem-reservation=0B thread-reservation=1
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=4B cardinality=7300
    in pipelines: 00(GETNEXT)
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 6dbeb73..6eb42ef 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -5396,3 +5396,77 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=3B cardinality=11000
    in pipelines: 00(GETNEXT)
 ====
+# Kudu Scan
+select * from functional_kudu.alltypes
+---- PLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=2
+Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=4.88MB mem-reservation=0B 
thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   mem-estimate=4.88MB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=97B cardinality=7300
+   in pipelines: 00(GETNEXT)
+====
+# Kudu Scan of single column
+select int_col from functional_kudu.alltypes
+---- PLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=2
+Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=384.00KB mem-reservation=0B 
thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   mem-estimate=384.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7300
+   in pipelines: 00(GETNEXT)
+====
+# Kudu Scan count(*)
+select count(*) from functional_kudu.alltypes
+---- PLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=2
+Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=10.00MB mem-reservation=0B 
thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+   mem-estimate=0B mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=0B cardinality=7300
+   in pipelines: 00(GETNEXT)
+====
+# Kudu Scan
+select * from tpch_kudu.nation
+---- PLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=2
+Per-Host Resource Estimates: Memory=10MB
+Codegen disabled by planner
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=1.50MB mem-reservation=0B 
thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+00:SCAN KUDU [tpch_kudu.nation]
+   mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=117B cardinality=25
+   in pipelines: 00(GETNEXT)
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/3fabc2de/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
index 3de3714..2a5cbd7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
@@ -82,7 +82,7 @@ order by
 limit 100
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=18.31MB Threads=10
-Per-Host Resource Estimates: Memory=24MB
+Per-Host Resource Estimates: Memory=49MB
 PLAN-ROOT SINK
 |
 18:TOP-N [LIMIT=100]
@@ -178,7 +178,7 @@ order by
 limit 10
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=12.38MB Threads=4
-Per-Host Resource Estimates: Memory=20MB
+Per-Host Resource Estimates: Memory=26MB
 PLAN-ROOT SINK
 |
 06:TOP-N [LIMIT=10]
@@ -231,7 +231,7 @@ order by
   o_orderpriority
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=13.94MB Threads=3
-Per-Host Resource Estimates: Memory=22MB
+Per-Host Resource Estimates: Memory=42MB
 PLAN-ROOT SINK
 |
 04:SORT
@@ -279,7 +279,7 @@ order by
   revenue desc
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=15.38MB Threads=7
-Per-Host Resource Estimates: Memory=23MB
+Per-Host Resource Estimates: Memory=32MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -340,7 +340,7 @@ where
   and l_quantity < 24
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=0B Threads=2
-Per-Host Resource Estimates: Memory=10MB
+Per-Host Resource Estimates: Memory=16MB
 PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
@@ -390,7 +390,7 @@ order by
   l_year
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=76.69MB Threads=7
-Per-Host Resource Estimates: Memory=83MB
+Per-Host Resource Estimates: Memory=90MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -478,7 +478,7 @@ order by
   o_year
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=12.56MB Threads=9
-Per-Host Resource Estimates: Memory=18MB
+Per-Host Resource Estimates: Memory=33MB
 PLAN-ROOT SINK
 |
 16:SORT
@@ -574,7 +574,7 @@ order by
   o_year desc
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=79.50MB Threads=7
-Per-Host Resource Estimates: Memory=104MB
+Per-Host Resource Estimates: Memory=118MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -656,7 +656,7 @@ order by
 limit 20
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=44.44MB Threads=5
-Per-Host Resource Estimates: Memory=61MB
+Per-Host Resource Estimates: Memory=71MB
 PLAN-ROOT SINK
 |
 08:TOP-N [LIMIT=20]
@@ -726,7 +726,7 @@ order by
   value desc
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=9.69MB Threads=7
-Per-Host Resource Estimates: Memory=28MB
+Per-Host Resource Estimates: Memory=38MB
 PLAN-ROOT SINK
 |
 13:SORT
@@ -807,7 +807,7 @@ order by
   l_shipmode
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=35.94MB Threads=3
-Per-Host Resource Estimates: Memory=46MB
+Per-Host Resource Estimates: Memory=49MB
 PLAN-ROOT SINK
 |
 04:SORT
@@ -891,7 +891,7 @@ where
   and l_shipdate < '1995-10-01'
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=17.00MB Threads=3
-Per-Host Resource Estimates: Memory=27MB
+Per-Host Resource Estimates: Memory=33MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
@@ -940,7 +940,7 @@ order by
   s_suppkey
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=15.88MB Threads=4
-Per-Host Resource Estimates: Memory=32MB
+Per-Host Resource Estimates: Memory=42MB
 PLAN-ROOT SINK
 |
 08:SORT
@@ -1056,7 +1056,7 @@ where
   )
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=8.62MB Threads=4
-Per-Host Resource Estimates: Memory=22MB
+Per-Host Resource Estimates: Memory=24MB
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
@@ -1121,7 +1121,7 @@ order by
 limit 100
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=81.25MB Threads=5
-Per-Host Resource Estimates: Memory=155MB
+Per-Host Resource Estimates: Memory=158MB
 PLAN-ROOT SINK
 |
 09:TOP-N [LIMIT=100]
@@ -1196,7 +1196,7 @@ where
   )
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=2.88MB Threads=3
-Per-Host Resource Estimates: Memory=13MB
+Per-Host Resource Estimates: Memory=22MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
@@ -1253,7 +1253,7 @@ order by
   s_name
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=50.81MB Threads=6
-Per-Host Resource Estimates: Memory=51MB
+Per-Host Resource Estimates: Memory=60MB
 PLAN-ROOT SINK
 |
 10:SORT
@@ -1339,7 +1339,7 @@ order by
 limit 100
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=10.56MB Threads=7
-Per-Host Resource Estimates: Memory=12MB
+Per-Host Resource Estimates: Memory=71MB
 PLAN-ROOT SINK
 |
 12:TOP-N [LIMIT=100]
@@ -1426,7 +1426,7 @@ order by
   cntrycode
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=13.94MB Threads=4
-Per-Host Resource Estimates: Memory=22MB
+Per-Host Resource Estimates: Memory=31MB
 PLAN-ROOT SINK
 |
 07:SORT

Reply via email to