[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Riza Suminto has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/19593 ) Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. IMPALA-11604 (part 2): Compute Effective Parallelism of Query Part 1 of IMPALA-11604 implements the ProcessingCost model for each PlanNode and DataSink. This second part builds on top of ProcessingCost model by adjusting the number of instances for each fragment after considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the three steps below. I. Compute the total ProcessingCost of a fragment. The costing algorithm splits a query fragment into several segments divided by blocking PlanNode/DataSink boundary. Each fragment segment is a subtree of PlanNodes/DataSink in the fragment with a DataSink or blocking PlanNode as root and non-blocking leaves. All other nodes in the segment are non-blocking. PlanNodes or DataSink that belong to the same segment will have their ProcessingCost summed. A new CostingSegment class is added to represent this segment. A fragment that has a blocking PlanNode or blocking DataSink is called a blocking fragment. Currently, only JoinBuildSink is considered as blocking DataSink. A fragment without any blocking nodes is called a non-blocking fragment. Step III discuss further about blocking and non-blocking fragment. Take an example of the following fragment plant, which is blocking since it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N. F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12) fragment-costs=[34974657, 2159270, 23752870, 22] 08:TOP-N [LIMIT=100] | cost=900 | 07:ANALYTIC | cost=23751970 | 06:SORT | cost=2159270 | 12:AGGREGATE [FINALIZE] | cost=34548320 | 11:EXCHANGE [HASH(i_class)] cost=426337 In bottom-up direction, there exist four segments in F03: Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE) Blocking segment 2: 06:SORT Blocking segment 3: (07:ANALYTIC, 08:TOP-N) Non-blocking segment 4: DataStreamSink of F03 Therefore we have: PC(segment 1) = 426337+34548320 PC(segment 2) = 2159270 PC(segment 3) = 23751970+900 PC(segment 4) = 22 These per-segment costs stored in a CostingSegment tree rooted at PlanFragment.rootSegment_, and are [34974657, 2159270, 23752870, 22] respectively after the post-order traversal. This is implemented in PlanFragment.computeCostingSegment() and PlanFragment.collectCostingSegmentHelper(). II. Compute the effective degree of parallelism (EDoP) of fragments. The costing algorithm walks PlanFragments of the query plan tree in post-order traversal. Upon visiting a PlanFragment, the costing algorithm attempts to adjust the number of instances (effective parallelism) of that fragment by comparing the last segment's ProcessingCost of its child and production-consumption rate between its adjacent segments from step I. To simplify this initial implementation, the parallelism of PlanFragment containing EmptySetNode, UnionNode, or ScanNode will remain unchanged (follow MT_DOP). This step is implemented at PlanFragment.traverseEffectiveParallelism(). III. Compute the EDoP of the query. Effective parallelism of a query is the maximum upper bound of CPU core count that can parallelly work on a query when considering the overlapping between fragment execution and blocking operators. We compute this in a similar post-order traversal as step II and split the query tree into blocking fragment subtrees similar to step I. The following is an example of a query plan from TPCDS-Q12. F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK | 13:MERGING-EXCHANGE [UNPARTITIONED] | F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12) 08:TOP-N [LIMIT=100] | 07:ANALYTIC | 06:SORT | 12:AGGREGATE [FINALIZE] | 11:EXCHANGE [HASH(i_class)] | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12 05:AGGREGATE [STREAMING] | 04:HASH JOIN [INNER JOIN, BROADCAST] | |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 | JOIN BUILD | | | 10:EXCHANGE [BROADCAST] | | | F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | 02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM] | 03:HASH JOIN [INNER JOIN, BROADCAST] | |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 | JOIN BUILD | | | 09:EXCHANGE [BROADCAST] | | | F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | 01:SCAN HDFS [tpcds10_parquet.item, RANDOM] | 00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM] A blocking fragment is a fragment that has a blocking PlanNode or blocking DataSink in it. The costing algorithm splits the query plan tree into blocking subtrees divided by blocking fragment boundary.
[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19593 ) Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. Patch Set 1: Code-Review+2 Thanks everyone! Carry two +1. -- To view, visit http://gerrit.cloudera.org:8080/19593 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e Gerrit-Change-Number: 19593 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 08 Mar 2023 15:31:38 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19593 ) Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. Patch Set 1: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/19593 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e Gerrit-Change-Number: 19593 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 08 Mar 2023 13:53:13 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19593 ) Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. Patch Set 1: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/19593 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e Gerrit-Change-Number: 19593 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Mar 2023 08:36:37 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19593 ) Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. Patch Set 1: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/19593 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e Gerrit-Change-Number: 19593 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Mar 2023 05:43:15 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19593 ) Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. Patch Set 1: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12563/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19593 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e Gerrit-Change-Number: 19593 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Mar 2023 03:47:57 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19593 ) Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. Patch Set 1: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/9116/ DRY_RUN=true -- To view, visit http://gerrit.cloudera.org:8080/19593 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e Gerrit-Change-Number: 19593 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Mar 2023 03:32:10 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query
Riza Suminto has uploaded this change for review. ( http://gerrit.cloudera.org:8080/19593 Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query .. IMPALA-11604 (part 2): Compute Effective Parallelism of Query Part 1 of IMPALA-11604 implements the ProcessingCost model for each PlanNode and DataSink. This second part builds on top of ProcessingCost model by adjusting the number of instances for each fragment after considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the three steps below. I. Compute the total ProcessingCost of a fragment. The costing algorithm splits a query fragment into several segments divided by blocking PlanNode/DataSink boundary. Each fragment segment is a subtree of PlanNodes/DataSink in the fragment with a DataSink or blocking PlanNode as root and non-blocking leaves. All other nodes in the segment are non-blocking. PlanNodes or DataSink that belong to the same segment will have their ProcessingCost summed. A new CostingSegment class is added to represent this segment. A fragment that has a blocking PlanNode or blocking DataSink is called a blocking fragment. Currently, only JoinBuildSink is considered as blocking DataSink. A fragment without any blocking nodes is called a non-blocking fragment. Step III discuss further about blocking and non-blocking fragment. Take an example of the following fragment plant, which is blocking since it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N. F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12) fragment-costs=[34974657, 2159270, 23752870, 22] 08:TOP-N [LIMIT=100] | cost=900 | 07:ANALYTIC | cost=23751970 | 06:SORT | cost=2159270 | 12:AGGREGATE [FINALIZE] | cost=34548320 | 11:EXCHANGE [HASH(i_class)] cost=426337 In bottom-up direction, there exist four segments in F03: Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE) Blocking segment 2: 06:SORT Blocking segment 3: (07:ANALYTIC, 08:TOP-N) Non-blocking segment 4: DataStreamSink of F03 Therefore we have: PC(segment 1) = 426337+34548320 PC(segment 2) = 2159270 PC(segment 3) = 23751970+900 PC(segment 4) = 22 These per-segment costs stored in a CostingSegment tree rooted at PlanFragment.rootSegment_, and are [34974657, 2159270, 23752870, 22] respectively after the post-order traversal. This is implemented in PlanFragment.computeCostingSegment() and PlanFragment.collectCostingSegmentHelper(). II. Compute the effective degree of parallelism (EDoP) of fragments. The costing algorithm walks PlanFragments of the query plan tree in post-order traversal. Upon visiting a PlanFragment, the costing algorithm attempts to adjust the number of instances (effective parallelism) of that fragment by comparing the last segment's ProcessingCost of its child and production-consumption rate between its adjacent segments from step I. To simplify this initial implementation, the parallelism of PlanFragment containing EmptySetNode, UnionNode, or ScanNode will remain unchanged (follow MT_DOP). This step is implemented at PlanFragment.traverseEffectiveParallelism(). III. Compute the EDoP of the query. Effective parallelism of a query is the maximum upper bound of CPU core count that can parallelly work on a query when considering the overlapping between fragment execution and blocking operators. We compute this in a similar post-order traversal as step II and split the query tree into blocking fragment subtrees similar to step I. The following is an example of a query plan from TPCDS-Q12. F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK | 13:MERGING-EXCHANGE [UNPARTITIONED] | F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12) 08:TOP-N [LIMIT=100] | 07:ANALYTIC | 06:SORT | 12:AGGREGATE [FINALIZE] | 11:EXCHANGE [HASH(i_class)] | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12 05:AGGREGATE [STREAMING] | 04:HASH JOIN [INNER JOIN, BROADCAST] | |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 | JOIN BUILD | | | 10:EXCHANGE [BROADCAST] | | | F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | 02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM] | 03:HASH JOIN [INNER JOIN, BROADCAST] | |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 | JOIN BUILD | | | 09:EXCHANGE [BROADCAST] | | | F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | 01:SCAN HDFS [tpcds10_parquet.item, RANDOM] | 00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM] A blocking fragment is a fragment that has a blocking PlanNode or blocking DataSink in it. The costing algorithm splits the query plan tree into blocking subtrees divided by blocking fragment boundary. Each