Riza Suminto has uploaded a new patch set (#45) to the change originally
created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 )
Change subject: IMPALA-11604 Planner changes for CPU usage
......................................................................
IMPALA-11604 Planner changes for CPU usage
This patch augments IMPALA-10992 by establishing an infrastructure to
allow the weighted total amount of data to process to be used as a new
factor in the definition and selection of an executor group. At the
basis of the CPU costing model, we define ProcessingCost as a cost for a
distinct PlanNode / DataSink / PlanFragment to process its input rows
globally across all of its instances. The costing algorithm then tries
to adjust the number of instances for each fragment by considering their
production-consumption ratio, and then finally returns a number
representing an ideal CPU core count required for a query to run
efficiently. A more detailed explanation of the CPU costing algorithm
can be found in the four steps below.
I. Compute ProcessingCost for each plan node and data sink.
ProcessingCost of a PlanNode/DataSink is a weighted amount of data
processed by that node/sink. The basic ProcessingCost is computed with a
general formula as follows.
ProcessingCost is a pair: PC(D, N), where D = I * (C + M)
where D is the weighted amount of data processed
I is the input cardinality
C is the expression evaluation cost per row.
Set to total weight of expression evaluation in node/sink.
M is a materialization cost per row.
Only used by scan and exchange node. Otherwise, 0.
N is the number of instances.
Default to D / MIN_COST_PER_THREAD (1 million), but
is fixed for a certain node/sink and adjustable in step III.
In this patch, the weight of each expression evaluation is set to a
constant of 1. A description of the computation for each kind of
PlanNode/DataSink is given below.
01. AggregationNode:
Each AggregateInfo has its C as a sum of grouping expression and
aggregate expression and then assigned a single ProcessingCost
individually. These ProcessingCosts then summed to be the Aggregation
node's ProcessingCost;
02. AnalyticEvalNode:
C is the sum of the evaluation costs for analytic functions;
03. CardinalityCheckNode:
Use the general formula, I = 1;
04. DataSourceScanNode:
Follow the formula from the superclass ScanNode;
05. EmptySetNode:
I = 0;
06. ExchangeNode:
M = 1 / row batch size.
A modification of the general formula when in broadcast mode:
D = D * number of receivers;
07. HashJoinNode:
probe cost = PC(I0 * C(equiJoin predicate), N) +
PC(output cardinality * C(otherJoin predicate), N)
build cost = PC(I1 * C(equi-join predicate), N)
With I0 and I1 as input cardinality of the probe and build side
accordingly. If the plan node does not have a separate build, ProcessingCost
is the sum of probe cost and build cost. Otherwise, ProcessingCost is
equal to probeCost.
08. HbaseScanNode, HdfsScanNode, and KuduScanNode:
Follow the formula from the superclass ScanNode;
09. Nested loop join node:
When the right child is not a SingularRowSrcNode:
probe cost = PC(I0 * C(equiJoin predicate), N) +
PC(output cardinality * C(otherJoin predicate), N)
build cost = PC(I1 * C(equiJoin predicate), N)
When the right child is a SingularRowSrcNode:
probe cost = PC(I0, N)
build cost = PC(I0 * I1, N)
With I0 and I1 as input cardinality of the probe and build side
accordingly. If the plan node does not have a separate build, ProcessingCost
is the sum of probe cost and build cost. Otherwise, ProcessingCost is
equal to probeCost.
10. ScanNode:
M = 1 / row batch size;
11. SelectNode:
Use the general formula;
12. SingularRowSrcNode:
Since the node is involved once per input in nested loop join, the
contribution of this node is computed in nested loop join;
13. SortNode:
C is the evaluation cost for the sort expression;
14. SubplanNode:
C is 1. I is the multiplication of the cardinality of the left and
the right child;
15. Union node:
C is the cost of result expression evaluation from all non-pass-through
children;
16. Unnest node:
I is the cardinality of the containing SubplanNode and C is 1.
17. DataStreamSink:
M = 1 / num rows per batch.
18. JoinBuildSink:
ProcessingCost is the build cost of its associated JoinNode.
29. PlanRootSink:
If result spooling is enabled, C is the cost of output expression
evaluation. Otherwise. ProcessingCost is zero.
20. TableSink:
C is the cost of output expression evaluation.
TableSink subclasses (including HBaseTableSink, HdfsTableSink, and
KuduTableSink) follow the same formula;
II. Compute the total ProcessingCost of a fragment.
A query fragment has a processingCosts_ field, a list of ProcessingCost
associated with the fragment. Each element of processingCosts_ may
solely come from a single PlanNode or sum of adjacent PlanNode costs.
The costing algorithm split the fragment into several segments,
separated by blocking PlanNode. PlanNodes or DataSink that belong to the
same segment will have their ProcessingCost summed. The first cost in
processingCosts_ (usually associated with ScanNode or ExchangeNode) is
called Input ProcessingCost, while the last cost in
processingCosts_ (associated with DataSink) is called
OutputProcessingCost. If a fragment does not contain a blocking
PlanNode, then the list ProcessingCost is a sum of all ProcessingCost of
its PlanNodes and DataSink and it is both an Input and Output
ProcessingCost. A fragment's Output ProcessingCost is being used in the
next step III to adjust the parallelism of the parent fragment.
This is implemented in PlanFragment.computeFragmentProcessingCost() and
PlanFragment.collectProcessingCostHelper().
III. Compute the effective parallelism of query fragments.
The costing algorithm recursively walks the query plan tree visiting
PlanFragment from the leaf and going up to the root. Upon visiting a
PlanFragment, the costing algorithms will try to adjust the number of
instances (effective parallelism) of query fragments by comparing Output
ProcessingCost and production-consumption rate between adjacent segments
within a fragment from step II. To simplify this initial implementation,
the parallelism of PlanFragment containing EmptySetNode, UnionNode, or
ScanNode will remain unchanged (follow MT_DOP).
This step is implemented at PlanFragment.traverseEffectiveParallelism().
IV. Compute the effective parallelism of the query.
Effective parallelism of a query is the maximum upper bound of CPU core
count that can parallelly work on a query when considering the
overlapping between fragment execution and blocking operators. We
compute this in a similar recursive walk as step III. Adjacent fragments
that are not blocking will have their adjusted instance count summed,
meaning that both fragments can run in parallel and should be assigned 1
core per fragment instance. The summation stops upon visiting a blocking
PlanFragment. At blocking PlanFragment, the maximum between current
CoreRequirement vs blocking-child-subtrees CoreRequirements is taken and
the recursive algorithm continues to the next ancestor PlanFragment.
This step is implemented at Planner.computeBlockingAwareCores() and
PlanFragment.traverseBlockingAwareCores().
The resulting CoreRequirement at the root PlanFragment is then taken as
the ideal CPU core requirement / effective parallelism of the query.
This number will be compared against the total CPU count of an Impala
executor group to determine if it fits to run in that executor group set
or not. A backend flag query_cpu_requirement_divisor is added to help
scale down/up the CPU core requirement of a query if needed.
Three query options are added to control this CPU costing algorithm.
1. COMPUTE_PROCESSING_COST
Control whether to enable this CPU costing algorithm or not.
Must also set MT_DOP > 0 for this query option to take effect.
2. PROCESSING_COST_MAX_THREADS
Control the maximum number of fragment instances (threads) that the
costing algorithm is allowed to adjust.
3. PROCESSING_COST_ALLOW_THREAD_INCREMENT
Control whether the costing algorithm is allowed to increase the
instance count of a fragment beyond PROCESSING_COST_MAX_THREADS due
to the large estimated workload. It is suggested to keep this to
false until the min_input_rows_per_thread backend flag has been
finely tuned.
This patch also adds 3 backend flags to tune the algorithm:
query_cpu_requirement_divisor, processing_cost_equal_expr_weight, and
min_input_rows_per_thread. Their description is available in
backend-gflag-util.cc.
As an example, the following are additional ProcessingCost information
printed to coordinator log for Q3, Q12, and Q15 ran on TPCDS 10GB scale,
3 executors, MT_DOP=4, and PROCESSING_COST_MAX_THREADS=4.
Q3
CoreRequirement={total=12 trace=F00:12} ProcessingCost=86939929
Q12
CoreRequirement={total=12 trace=F00:12} ProcessingCost=62625857
Q15
CoreRequirement={total=15 trace=N07:3+F00:12} ProcessingCost=69481834
Testing:
- Add TestTpcdsQueryWithProcessingCost, which is a similar run of
TestTpcdsQuery, but with COMPUTE_PROCESSING_COST=1 and MT_DOP=4.
Setting log level TRACE for PlanFragment and manually running
TestTpcdsQueryWithProcessingCost in minicluster shows several fragment
instance count reduction from 12 to either of 9, 6, or 3 in
coordinator log.
- Add PlannerTest#testProcessingCost
Adjusted fragment count is indicated by "(adjusted from 12)" in the
query profile.
Co-authored-by: Riza Suminto <[email protected]>
Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
---
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/backend-gflag-util.cc
M common/thrift/BackendGflags.thrift
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
M fe/src/main/java/org/apache/impala/analysis/SortInfo.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
A fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
A fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
A fe/src/main/java/org/apache/impala/planner/CoreRequirement.java
M fe/src/main/java/org/apache/impala/planner/DataSink.java
M fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
M fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
M fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
M fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
M fe/src/main/java/org/apache/impala/planner/JoinNode.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
A fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
A fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/SelectNode.java
M fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
M fe/src/main/java/org/apache/impala/planner/SortNode.java
M fe/src/main/java/org/apache/impala/planner/SubplanNode.java
A fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/TableSink.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/planner/UnnestNode.java
M fe/src/main/java/org/apache/impala/service/BackendConfig.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/main/java/org/apache/impala/util/ExprUtil.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
A
testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/query_test/test_tpcds_queries.py
53 files changed, 10,296 insertions(+), 55 deletions(-)
git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/33/19033/45
--
To view, visit http://gerrit.cloudera.org:8080/19033
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
Gerrit-Change-Number: 19033
Gerrit-PatchSet: 45
Gerrit-Owner: Qifan Chen <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Kurt Deschler <[email protected]>
Gerrit-Reviewer: Qifan Chen <[email protected]>
Gerrit-Reviewer: Riza Suminto <[email protected]>
Gerrit-Reviewer: Wenzhe Zhou <[email protected]>