Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#4).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is repurposed as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two is
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    
--query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A
 \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | 
Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | 
Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   
+1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 
2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   
+1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 
0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   
+1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 
1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   
+0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 
0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   
+0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 
0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   
-0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | 
-0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   
-0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | 
-0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   
-1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | 
-0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   
-2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | 
-3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M 
testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,338 insertions(+), 1,101 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/4
--
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 4
Gerrit-Owner: Riza Suminto <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Kurt Deschler <[email protected]>
Gerrit-Reviewer: Quanlong Huang <[email protected]>
Gerrit-Reviewer: Riza Suminto <[email protected]>
Gerrit-Reviewer: Wenzhe Zhou <[email protected]>

Reply via email to