Tim Armstrong has uploaded this change for review. ( 
http://gerrit.cloudera.org:8080/16022


Change subject: IMPALA-9814: fix mt_dop parallelism for analytic fns
......................................................................

IMPALA-9814: fix mt_dop parallelism for analytic fns

This replaces getNumNodes() with getNumInstances() in AnalyticPlanner,
which fixes some cases of underparallelisation with analytic functions.
Here is an example query which is underparallelised by only partitioning
on a column, ss_store_sk, with NDV=6, when there are 3 backends with
mt_dop=3.

set mt_dop=3;
explain select count(*) over (partition by ss_addr_sk, ss_store_sk),
               count(*) over (partition by ss_sold_date_sk, ss_store_sk)
from tpcds_parquet.store_sales;

Before:
+---------------------------------------------------------------------------+
| Explain String                                                            |
+---------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=61.50MB Threads=7               |
| Per-Host Resource Estimates: Memory=148MB                                 |
|                                                                           |
| PLAN-ROOT SINK                                                            |
| |                                                                         |
| 06:EXCHANGE [UNPARTITIONED]                                               |
| |                                                                         |
| 04:ANALYTIC                                                               |
| |  functions: count(*)                                                    |
| |  partition by: ss_sold_date_sk, ss_store_sk                             |
| |  row-size=28B cardinality=2.88M                                         |
| |                                                                         |
| 03:SORT                                                                   |
| |  order by: ss_sold_date_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST |
| |  row-size=20B cardinality=2.88M                                         |
| |                                                                         |
| 02:ANALYTIC                                                               |
| |  functions: count(*)                                                    |
| |  partition by: ss_addr_sk, ss_store_sk                                  |
| |  row-size=20B cardinality=2.88M                                         |
| |                                                                         |
| 01:SORT                                                                   |
| |  order by: ss_addr_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST      |
| |  row-size=12B cardinality=2.88M                                         |
| |                                                                         |
| 05:EXCHANGE [HASH(ss_store_sk)]                                           |
| |                                                                         |
| 00:SCAN HDFS [tpcds_parquet.store_sales]                                  |
|    HDFS partitions=1824/1824 files=1824 size=196.96MB                     |
|    row-size=12B cardinality=2.88M                                         |
+---------------------------------------------------------------------------+

After, the two stages are partitioned by both analytic partition columns:
+---------------------------------------------------------------------------+
| Explain String                                                            |
+---------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=61.50MB Threads=10              |
| Per-Host Resource Estimates: Memory=209MB                                 |
|                                                                           |
| PLAN-ROOT SINK                                                            |
| |                                                                         |
| 07:EXCHANGE [UNPARTITIONED]                                               |
| |                                                                         |
| 04:ANALYTIC                                                               |
| |  functions: count(*)                                                    |
| |  partition by: ss_sold_date_sk, ss_store_sk                             |
| |  row-size=28B cardinality=2.88M                                         |
| |                                                                         |
| 03:SORT                                                                   |
| |  order by: ss_sold_date_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST |
| |  row-size=20B cardinality=2.88M                                         |
| |                                                                         |
| 06:EXCHANGE [HASH(ss_sold_date_sk,ss_store_sk)]                           |
| |                                                                         |
| 02:ANALYTIC                                                               |
| |  functions: count(*)                                                    |
| |  partition by: ss_addr_sk, ss_store_sk                                  |
| |  row-size=20B cardinality=2.88M                                         |
| |                                                                         |
| 01:SORT                                                                   |
| |  order by: ss_addr_sk ASC NULLS FIRST, ss_store_sk ASC NULLS FIRST      |
| |  row-size=12B cardinality=2.88M                                         |
| |                                                                         |
| 05:EXCHANGE [HASH(ss_addr_sk,ss_store_sk)]                                |
| |                                                                         |
| 00:SCAN HDFS [tpcds_parquet.store_sales]                                  |
|    HDFS partitions=1824/1824 files=1824 size=196.96MB                     |
|    row-size=12B cardinality=2.88M                                         |
+---------------------------------------------------------------------------+

Testing:
Ran exhaustive tests.

Change-Id: Ia88d9494c566b984c18f4b051c2d76f389078dd9
---
M fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
A 
testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns-mt-dop.test
3 files changed, 144 insertions(+), 13 deletions(-)



  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/22/16022/1
--
To view, visit http://gerrit.cloudera.org:8080/16022
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia88d9494c566b984c18f4b051c2d76f389078dd9
Gerrit-Change-Number: 16022
Gerrit-PatchSet: 1
Gerrit-Owner: Tim Armstrong <[email protected]>

Reply via email to