[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 58: Code-Review-2 Putting -2 for now until I managed to split the patch. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 58 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Mar 2023 03:10:31 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 58: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 58 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Mar 2023 02:38:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 58: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 58 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Mar 2023 22:37:47 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 58: -Code-Review Temporarily removing the +2 votes. I'll test whether splitting this patch is feasible. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 58 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Mar 2023 21:14:08 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 58: Changes look good but please split the effective parallelism work into a separate patch so that we can manage the two separately and authors are correctly represented. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 58 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Mar 2023 19:02:58 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 58: Code-Review+2 (1 comment) Thank you Qifan and Wenzhe! I mention some follow up work in the commit message. Carry the +2. http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java@30 PS54, Line 30: long cardinality, float exprsCost, float materializationCost) { > For VARCHAR, we can use some kind of average width stats, if available. Fo Filed IMPALA-11972 for this. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 58 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Mar 2023 17:30:44 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#58) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 58: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/9113/ DRY_RUN=true -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 58 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Mar 2023 17:32:07 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 57: Code-Review+2 (2 comments) +2 to include WenZhe's +1. http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java@30 PS54, Line 30: long cardinality, float exprsCost, float materializationCost) { > Agree that row width might factor in the PC for some operator. Is fact, I a For VARCHAR, we can use some kind of average width stats, if available. For fixed width columns, we just use the width. In both cases, the unit should be in bytes, at least in first draft. The idea of including a width in costing is to make the outcome as precise and less error-prone as possible. I am okay with making the change in next iteration. Since being very important, maybe creating a new JIRA and referring to it in the commit message. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@270 PS52, Line 270: > Let say, I see. Thanks for the examples. I agree the use of > is fine. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 57 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Mar 2023 14:20:40 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 57: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12541/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 57 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sun, 05 Mar 2023 05:36:12 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 57: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/12539/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 57 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sat, 04 Mar 2023 14:26:58 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 56: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/12538/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 56 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sat, 04 Mar 2023 11:58:40 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 55: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/12536/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 55 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sat, 04 Mar 2023 06:55:56 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 57: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 57 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sat, 04 Mar 2023 00:29:07 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 57: (5 comments) Done. Sorry for duplicating test_query_assignment_with_two_exec_groups. http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py File tests/custom_cluster/test_executor_groups.py: http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@744 PS55, Line 744: fair-scheduler-3-groups.xml > nit: should we rename the file since we add one more group? Done http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@747 PS55, Line 747: three set > nit: add one more group Done http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@750 PS55, Line 750: > nit: should we rename the file since we add one more group? Done http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@753 PS55, Line 753: small and large. > Could we keep consistent with group names in llama-site-2-groups.xml and fa Reverted this. Looks like I accidentally edit this. test_query_assignment_with_two_exec_groups will continue to test with 2 executor groups only. http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@793 PS55, Line 793: _DIR = os.path.j > nit: three group sets: tiny, Done -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 57 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sat, 04 Mar 2023 00:14:19 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#57) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#56) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 55: (5 comments) Thanks to address all of my comments. Just a few comments for the new test. Others looks good to me. http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py File tests/custom_cluster/test_executor_groups.py: http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@744 PS55, Line 744: fair-scheduler-2-groups.xml nit: should we rename the file since we add one more group? http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@747 PS55, Line 747: two sets: nit: add one more group http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@750 PS55, Line 750: llama-site-2-groups.xml nit: should we rename the file since we add one more group? http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@753 PS55, Line 753: root.small:1,root:medium:2,root.large:3 Could we keep consistent with group names in llama-site-2-groups.xml and fair-scheduler-2-groups.xml? http://gerrit.cloudera.org:8080/#/c/19033/55/tests/custom_cluster/test_executor_groups.py@793 PS55, Line 793: two group sets: nit: three group sets: tiny, -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 55 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 03 Mar 2023 23:41:24 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 55: (11 comments) http://gerrit.cloudera.org:8080/#/c/19033/54//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/54//COMMIT_MSG@295 PS54, Line 295: . > Need to mention the default value. Maybe set it to MT_DOP? Added clarification. http://gerrit.cloudera.org:8080/#/c/19033/54//COMMIT_MSG@328 PS54, Line 328: processing_cost_use_equal_expr_weight=false. : : Q3 : CoreCount={total=12 trace=F00:12} : : Q12 : CoreCount={total=12 trace=F00:12} : > indentation Done http://gerrit.cloudera.org:8080/#/c/19033/54/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/19033/54/common/thrift/ImpalaService.thrift@768 PS54, Line 768: It is recommend to not set i > What's effect if setting this option as 128 for executors with 8 cores? Sho There is a possibility that query parallelism scale too high and does not fit any executor group set. Added warning to not set it with value more than number of physical cores in executor node. http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java@30 PS54, Line 30: long cardinality, float exprsCost, float materializationCost) { > I think we should factor in the row width in computing PC, as row width can Agree that row width might factor in the PC for some operator. Is fact, I added materialization cost is here to accommodate PC where row width should factor in. Currently, PC of ScanNode, ExchangeNode, and DataStreamSink has row width factored in through materialization parameter here. There is also question whether fixed-length column should be treated the same as varying-length column? Or, can some operator ignore row width because CodeGen-ed code can handle it very efficiently? Ultimately, I think further research is needed to determine which query operator should care about row width. I hope it is OK to get the current costing infrastructure merged in first and improve in the next iteration. http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/CoreCount.java File fe/src/main/java/org/apache/impala/planner/CoreCount.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/CoreCount.java@31 PS54, Line 31: , computed > CPU cores, computed from the CPU cost, Done http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@203 PS54, Line 203: costs per row a > nit: costs per row are Done http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/Planner.java@480 PS54, Line 480: max > max? Yes! Thank you for catching this. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@270 PS52, Line 270: > Not sure I follow. Maybe an example? Let say, nodeStepCount = 3 getNumInstancesExpected = 6 maxInstance = 9 Then consumer is still allowed to scale up num instance by 3 (6 + 3 = 9). However, let say nodeStepCount = 3 getNumInstancesExpected = 6 maxInstance = 8 Then consumer is in highest count already since it is not possible to scale up by 3 without exceeding maxInstance 8. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java File fe/src/main/java/org/apache/impala/planner/SegmentCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@78 PS52, Line 78: > Yeah a revisit will help. It sounds like we need to deal with the unknown c I'm not sure whats the best strategy yet to deal with missing cardinality. I suspect in the current state, worst case situation will adjust to maximum possible (but still bounded by num_cores_per_executor from exec group set and PROCESSING_COST_MIN_THREADS). I plan to revisit this in next iteration.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#55) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 54: (8 comments) Looks great! Thanks a lot for the changes, some of them are significant. http://gerrit.cloudera.org:8080/#/c/19033/54//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/54//COMMIT_MSG@295 PS54, Line 295: . Need to mention the default value. Maybe set it to MT_DOP? Should we also mention PROCESSING_COST_MAX_THREADS which can be default to max # of cores? http://gerrit.cloudera.org:8080/#/c/19033/54//COMMIT_MSG@328 PS54, Line 328: Q3 : CoreCount={total=12 trace=F00:12} : : Q12 : CoreCount={total=12 trace=F00:12} : : Q15 : CoreCount={total=15 trace=N07:3+F00:12} indentation http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java@30 PS54, Line 30: long cardinality, float exprsCost, float materializationCost) { I think we should factor in the row width in computing PC, as row width can vary a lot. Without considering the width, the computed PC may not be right. PC = cardinality * width * (expr cost + materialization cost). http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/CoreCount.java File fe/src/main/java/org/apache/impala/planner/CoreCount.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/CoreCount.java@31 PS54, Line 31: requirement CPU cores, computed from the CPU cost, http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@270 PS52, Line 270: > I think '>" is the correct sign here? Not sure I follow. Maybe an example? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java File fe/src/main/java/org/apache/impala/planner/SegmentCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@78 PS52, Line 78: > When I first work on this, I intentionally made ProcessingCost to not accep Yeah a revisit will help. It sounds like we need to deal with the unknown cost (-1 in cardinality) as well. Maybe we do not adjust in such situations? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@102 PS52, Line 102: > I decide to remove traverseBlockingAwareCost() method. I see. Just wonder if the algorithm can be modified to not check the state of the children. In other words, each child can supply an expected core count which is available when the parent node is being processed. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@118 PS52, Line 118: > In traverseBlockingAwareCores(), I changed the loop to iterate all children Sounds like a conservative strategy. Done. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 54 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 03 Mar 2023 14:54:41 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 54: (4 comments) Looks good to me. http://gerrit.cloudera.org:8080/#/c/19033/54/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/19033/54/common/thrift/ImpalaService.thrift@768 PS54, Line 768: Valid values are in [1, 128] What's effect if setting this option as 128 for executors with 8 cores? Should we recommend not setting it with value more than number of physical cores? http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@203 PS54, Line 203: cost per row is nit: costs per row are http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/19033/54/fe/src/main/java/org/apache/impala/planner/Planner.java@480 PS54, Line 480: min max? http://gerrit.cloudera.org:8080/#/c/19033/54/tests/custom_cluster/test_executor_groups.py File tests/custom_cluster/test_executor_groups.py: http://gerrit.cloudera.org:8080/#/c/19033/54/tests/custom_cluster/test_executor_groups.py@842 PS54, Line 842: Expect to run the query on the small group Could you define three executor groups? default divisor use middle size of group, divisor 2 use small group, divisor 0.2 use large group. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 54 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 03 Mar 2023 05:25:35 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 54: (42 comments) Patch set 54 address comments from patch set 51 and 52. I decide to remove some codes that is not very useful anymore to narrow down the review. I will add them back in the future if they are deemed important to have. PROCESSING_COST_ALLOW_THREAD_INCREMENT and PROCESSING_COST_MAX_THREAD is now removed. PROCESSING_COST_MIN_THREAD, MT_DOP, min_processing_per_thread, and num_cores_per_executor from selected executor group set now decide the minimum and maximum parallelism per-fragment. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@301 PS51, Line 301: executor group with total :available CPU X. Note that setting with a fractional > Ack. Will take a look at this. Repurpose PROCESSING_COST_MAX_THREADS into PROCESSING_COST_MIN_THREADS http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@304 PS51, Line 304:> 0.0. The default value is 1. : : 2. processing_cost_use_equal_expr_weight :If true, all expression evaluations are weighted equally to 1 during :the plan node's processing cost calculation. If false, expression :cost from IMP > I'll look if I can core core limit from IMPALA-11617. ps54 pass the num_cores_per_executor from selected executor group set as the max thread count. Final EDoP can still exceed num cores available in selected group set, and Frontend will need to replan with the next larger group set. http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/scheduling/scheduler.h File be/src/scheduling/scheduler.h: http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/scheduling/scheduler.h@432 PS52, Line 432: eturn true if 'pl > Could we change this new function, ContainsNode and following two new funct Done http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc@200 PS52, Line 200: query_cpu_count_divisor, 1.0 > Is it possible to add test cases with divisor as 0.5 and 2.0? Added TestExecutorGroups::test_query_cpu_count_divisor. http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc@209 PS52, Line 209: eighted equally to 1 during the > add TODO comments for tune the expression cost Done http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc@212 PS52, Line 212: > This flag variable can be set as any positive number. Add what's the effect Explained the impact in flag description. http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/ImpalaService.thrift@771 PS52, Line 771: > nit: processing cost This query option is now deleted. http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/Planner.thrift File common/thrift/Planner.thrift: http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/Planner.thrift@90 PS52, Line 90: need > nit: need to Done http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java File fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java@724 PS52, Line 724: alCost(getGroupi > Do we need this unused parameter? Don't see @Override annotation for this f Done http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/SortInfo.java File fe/src/main/java/org/apache/impala/analysis/SortInfo.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/SortInfo.java@320 PS52, Line 320: alCost(getSortEx > unused parameter Done http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/AggregationNode.java File fe/src/main/java/org/apache/impala/planner/AggregationNode.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/AggregationNode.java@511 PS52, Line 511: processingCost_ = ProcessingCost.zero(); : for (AggregateInfo aggInfo : aggInfos_) { > We can tweak a little to avoid this list and two for loops. Done http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@31 PS52, Line 31: countSupplier mus > don't see multiplier in this class
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 54: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/12525/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 54 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 03 Mar 2023 00:16:30 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#54) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 53: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12523/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 53 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Thu, 02 Mar 2023 23:58:06 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 53: Patch set 53 update the planner test expected output after switching processing_cost_use_equal_expr_weight flag back to true. It has not address any comments from ps52 yet. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 53 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Thu, 02 Mar 2023 23:40:20 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#53) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 52: (15 comments) http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc@200 PS52, Line 200: query_cpu_count_divisor, 1.0 Is it possible to add test cases with divisor as 0.5 and 2.0? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@31 PS52, Line 31: Multiple supplier don't see multiplier in this class http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@62 PS52, Line 62: numInstanceSupplier_ call getNumInstanceExpected()? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/CoreCount.java File fe/src/main/java/org/apache/impala/planner/CoreCount.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/CoreCount.java@58 PS52, Line 58: ids_ = ids; Add Preconditions.check to make sure two lists have same length http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/DataSink.java File fe/src/main/java/org/apache/impala/planner/DataSink.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/DataSink.java@75 PS52, Line 75: && explainLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) { : // Print processing cost. : output.append(processingCost_.getExplainString(detailPrefix, false)); can be moved after line #67 http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/DataSink.java@147 PS52, Line 147: fragment_.getPlanRoot().getCardinality() can be replaced with getNumRowsProduced()? Numbers of rows consumed and produced are same? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@245 PS52, Line 245: SerDe too short, hard to read http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java File fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java@24 PS52, Line 24: MultipleProcessingCost > nit. Maybe ScaledProcessingCost? MultipleProcessingCost sounds like a list Or MultiplyProcessingCost? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java@28 PS52, Line 28: multiple nit: multiplier? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@39 PS52, Line 39: numInstances add Preconditions.check for numInstance greater than 0 http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java File fe/src/main/java/org/apache/impala/planner/SegmentCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@72 PS52, Line 72: nodes_.size() if nodes_size() equal 0? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@83 PS52, Line 83: appendSinkCost > appendSink() or setSink()? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/UnionNode.java File fe/src/main/java/org/apache/impala/planner/UnionNode.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/UnionNode.java@155 PS52, Line 155: cost nit: costs http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/service/Frontend.java File fe/src/main/java/org/apache/impala/service/Frontend.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/service/Frontend.java@273 PS52, Line 273: Certain queries such as EXPLAIN that do not populate : // TExecRequest.query_exec_request field nit: sentence seems not complete. set value as -1? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/test/java/org/apache/impala/planner/PlannerTest.java File
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 52: (8 comments) http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/scheduling/scheduler.h File be/src/scheduling/scheduler.h: http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/scheduling/scheduler.h@432 PS52, Line 432: ContainsUnionNode Could we change this new function, ContainsNode and following two new functions as static functions? It seems they don't use any member variables? http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc@209 PS52, Line 209: expression cost from IMPALA-2805 add TODO comments for tune the expression cost http://gerrit.cloudera.org:8080/#/c/19033/52/be/src/util/backend-gflag-util.cc@212 PS52, Line 212: // TODO: Benchmark and tune this config with an optimal value. This flag variable can be set as any positive number. Add what's the effectiveness if the value is too high, or too low. http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/ImpalaService.thrift@771 PS52, Line 771: costing nit: processing cost http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/Planner.thrift File common/thrift/Planner.thrift: http://gerrit.cloudera.org:8080/#/c/19033/52/common/thrift/Planner.thrift@90 PS52, Line 90: need nit: need to http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java File fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java@724 PS52, Line 724: int numInstances Do we need this unused parameter? Don't see @Override annotation for this function http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/SortInfo.java File fe/src/main/java/org/apache/impala/analysis/SortInfo.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/analysis/SortInfo.java@320 PS52, Line 320: int numInstances unused parameter http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/AggregationNode.java File fe/src/main/java/org/apache/impala/planner/AggregationNode.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/AggregationNode.java@511 PS52, Line 511: List processingCosts = : Lists.newArrayListWithCapacity(aggInfos_.size()); We can tweak a little to avoid this list and two for loops. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 52 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 27 Feb 2023 20:12:39 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 52: (25 comments) The implementation of the algorithm looks good! If the specific checks on JOIN and UNION can be generalized at the PlanNode level, it will be great. I also wonder if the caching of costs/cores used to compute the total is necessary. The producer consumer ratio based adjustment is fine and could be improved further at the guidance of a plan node in the future. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@171 PS51, Line 171: : In bottom-up direction, there exist four segments in F03: : Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE) : Blocking segment 2: 06 > Done. I rather keep calling segment 4 as non-blocking segment since only Jo Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@205 PS51, Line 205: > No, SegmentCost is modelled as tree. Done http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java File fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java@24 PS52, Line 24: MultipleProcessingCost nit. Maybe ScaledProcessingCost? MultipleProcessingCost sounds like a list of processing costs to me. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java@32 PS52, Line 32: > 0 >=0. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java@61 PS52, Line 61: MultCost ScaledCost()? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@151 PS52, Line 151: ProcessingCost, List nit. May need to add a comment for the first and the second. I also wonder if we need to cache the list, once we have the total cost (the sum over the segment subtree). http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@155 PS52, Line 155: CoreCount, List same as above. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@254 PS52, Line 254: consProdRatio consumerProducerRatio http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@270 PS52, Line 270: > >= http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java File fe/src/main/java/org/apache/impala/planner/SegmentCost.java: http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@48 PS52, Line 48: SegmentCost Maybe CostingSegment? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@54 PS52, Line 54: // The ProcessingCost of this fragment segment. nit. which is the sum of the processing cost of all nodes in nodes_. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@78 PS52, Line 78: 1 I wonder why 0 is being excluded. A node consuming or producing 0 rows is perfectly fine. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@79 PS52, Line 79: 1 same as above. http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@83 PS52, Line 83: appendSinkCost appendSink() http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@102 PS52, Line 102: subtreeCostBuilder I wonder if we need to remember the costs used to compute the max cost for the tree rooted at the current segment. Once the post-order traversal is done, the sum is known. These sums can be used to compute the # of cores. Maybe I missed something? http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@103 PS52, Line 103: segmentCost nit. maxCost http://gerrit.cloudera.org:8080/#/c/19033/52/fe/src/main/java/org/apache/impala/planner/SegmentCost.java@105 PS52, Line 105: . gather the costs of the children first and find the max cost among them.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 52: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12438/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 52 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 24 Feb 2023 20:57:30 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 52: (25 comments) ps52 is a rebase and address some comments in ps51. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@151 PS51, Line 151: > nit plan fragment, which is blocking since it has 3 blocking PlanNode: Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@171 PS51, Line 171: : In bottom-up direction, there exist four segments in F03: : Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE) : Blocking segment 2: 06 > indentation and with some additional info as follows. Done. I rather keep calling segment 4 as non-blocking segment since only JoinBuildSink is considered as blocking DataSink. This has correlation with definition of blocking fragment. All fragment has DataSink. But fragment without blocking PlanNode nor blocking DataSink is not blocking fragment. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@177 PS51, Line 177: : Therefore we have: : PC(segment 1) = 426337+34548320 : PC(segment 2) = > indentation Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@182 PS51, Line 182: PC(segment 4) = 22 > nit a Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@183 PS51, Line 183: : These per-segment costs stored in a SegmentCost tree rooted at : PlanFragment.rootSegment_, and ar > nit. , and are [34974657, 2159270, 23752870, 22] respectively after the pos Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@189 PS51, Line 189: PlanFragment.collectSegmentCostHelper(). > I think "Output ProcessingCost" should be really called "Total Processing c Removed this paragraph and refer the cost directly as "the last segment's ProcessingCost". I only consider the last segment rather than the total over all segment in fragment to anticipate for burst exchange scenario. For example, fragment that only do aggregate may spend long time during aggregation. But when it is ready to send rows upward, the receiver fragment above it should have similar EDoP to keep-up with the sender. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@196 PS51, Line 196: hat fragment by compa > nit. effective degree of parallelism (EDoP). We can use EDoP in the rest of Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@199 PS51, Line 199: UnionNode, or : ScanNode will > the costing algorithm attempts to adjust the number of Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@201 PS51, Line 201: > see the previous comment. Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@205 PS51, Line 205: > Assume that segments are modeled as a list in a plan fragment (true?) in th No, SegmentCost is modelled as tree. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@209 PS51, Line 209: in a similar post-order > EDoP Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@261 PS51, Line 261: > nit suggest to remove since a query plan with a sink node, which is blockin Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@263 PS51, Line 263: f a query or the query itself. Each blocking : subtree will > the intermediate or leaf nodes. Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@268 PS51, Line 268: ample is [4, 4, > By reading this para, it seems CoreCount is a better name. Usually a requi Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@269 PS51, Line 269: > nit remove Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@286 PS51, Line 286: control the entire com > EDoP Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@288 PS51, Line 288: m or not. > nit. remove Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@288 PS51, Line 288:Control whether to enable this CPU costing algorithm or not. > set Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@292 PS51, Line 292: instances (threads) that > the entire computation of EDoP. Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@300 PS51, Line 300: ount of > computing Done http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@301 PS51, Line 301: is suggested to keep this to :false until the min_processing_per_thread backend fl > I strongly suggest that we introduce PROCESSING_COST_MIN_THREADS in this pa Ack. Will take a look at this. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@304 PS51, Line 304: : This patch also adds three backend flags to tune the algorithm. : 1. query_cpu_count_divisor :Divide the CPU requirement of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#52) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 51: (26 comments) Looks very good! I will look at the code corresponding to section II, III and IV this weekend. Can you please also confirm that segments are still modeled as a list within a fragment? How hard is it to model as a tree? Personally I think it is very important that all operators can participate in EDoP adjustment to make this feature sound. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@151 PS51, Line 151: fragment plan nit plan fragment, which is blocking since it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@171 PS51, Line 171: 1. (11:EXCHANGE, 12:AGGREGATE) : 2. 06:SORT : 3. (07:ANALYTIC, 08:TOP-N) : 4. DataStreamSink of F03 indentation and with some additional info as follows. Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE) Blocking segment 2: 06:SORT Blocking segment 3: (07:ANALYTIC, 08:TOP-N) Non-blocking segment 4: DataStreamSink of F03 I also wonder if segment 4 should be a blocking one since by the definition above any DataSink is blocking. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@177 PS51, Line 177: PC(segment 1) = 426337+34548320 : PC(segment 2) = 2159270 : PC(segment 3) = 23751970+900 : PC(segment 4) = 22 indentation http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@182 PS51, Line 182: These per-segment costs stored in SegmentCost tree rooted at nit a http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@183 PS51, Line 183: . In this example, post-order traversal of : rootSegment_ will show their associated cost as: : [34974657, 2159270, 23752870, 22] nit. , and are [34974657, 2159270, 23752870, 22] respectively after the post-order traversal. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@186 PS51, Line 186: F03 is also a blocking fragment since it has 3 blocking PlanNode: : 12:AGGREGATE, 06:SORT, and 08:TOP-N. remove, as the info is described above. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@189 PS51, Line 189: A rootSegment_ is also called an Output ProcessingCost I think "Output ProcessingCost" should be really called "Total Processing cost", since it takes some cost for a fragment to output rows (not cost). http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@196 PS51, Line 196: effective parallelism nit. effective degree of parallelism (EDoP). We can use EDoP in the rest of the text. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@199 PS51, Line 199: algorithms will : try to adjust the costing algorithm attempts to adjust the number of http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@201 PS51, Line 201: Output see the previous comment. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@205 PS51, Line 205: . Assume that segments are modeled as a list in a plan fragment (true?) in this patch, we should append the following here: , since segments are modeled as a list in a plan fragment . http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@209 PS51, Line 209: the effective parallelism EDoP http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@261 PS51, Line 261: several nit suggest to remove since a query plan with a sink node, which is blocking node, at the root maps to one blocking subtree. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@263 PS51, Line 263: leaves. All other fragments in the subtree are : non-blocking the intermediate or leaf nodes. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@268 PS51, Line 268: CoreRequirement By reading this para, it seems CoreCount is a better name. Usually a requirement in SQL compiler means something not solid, such as ANY, NOT SINGLE, etc. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@269 PS51, Line 269: certain nit remove http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@286 PS51, Line 286: effective parallelism EDoP http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@288 PS51, Line 288: executor group to determine if it fits to run in that executor group set set http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@288 PS51, Line 288: executor group nit. remove http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@292 PS51, Line 292: this CPU costing algorithm the entire computation of EDoP. http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@300 PS51, Line 300: reducing computing http://gerrit.cloudera.org:8080/#/c/19033/51//COMMIT_MSG@301 PS51, Line 301: Currently, there is no option to
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 51: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12421/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 51 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 22 Feb 2023 18:23:20 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 50: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12420/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 50 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 22 Feb 2023 18:17:29 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 51: (4 comments) http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG@325 PS48, Line 325: sing_per_th > Yes, turn back to false after tweaking the numbers. Done http://gerrit.cloudera.org:8080/#/c/19033/49/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/49/be/src/util/backend-gflag-util.cc@213 PS49, Line 213: processing load > what's unit of processing load? Bytes? It is in processing cost unit. Clarified the description in ps50. http://gerrit.cloudera.org:8080/#/c/19033/49/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/49/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@20 PS49, Line 20: com.google.common.base.Preconditions; > Does not look like the right class to import. Done http://gerrit.cloudera.org:8080/#/c/19033/49/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@139 PS49, Line 139: l int finalProducerParal > why need to define this variable? I don't have this at first and directly use maxProducerParallelism, but I got error from my IDE: "Local variable maxProducerParallelism defined in an enclosing scope must be final or effectively final". Added final modifier in ps50. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 51 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 22 Feb 2023 18:03:21 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#51) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#50) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 49: (3 comments) http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG@325 PS48, Line 325: sing_per_th > My thought as well. Should we revert the default back to True? Yes, turn back to false after tweaking the numbers. http://gerrit.cloudera.org:8080/#/c/19033/49/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/49/be/src/util/backend-gflag-util.cc@213 PS49, Line 213: processing load what's unit of processing load? Bytes? http://gerrit.cloudera.org:8080/#/c/19033/49/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/49/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@139 PS49, Line 139: finalProducerParallelism why need to define this variable? -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 49 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 21 Feb 2023 22:34:33 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 49: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12390/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 49 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 17 Feb 2023 00:33:24 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 49: (9 comments) http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@213 PS43, Line 213: overlapping between fragment execution and blocking operators. We > The upper bound for each fragment should be the number of threads or someth min_processing_per_thread=10M seems to be a good upper bound in my local machine. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@140 PS47, Line 140: a subtree of PlanNodes/DataSink in the fragment with a DataSink or Added SegmentCost class for segment abstraction. Also added TPCDS-Q49 into tpcds-processing-cost.test to test against union fragment. http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG@325 PS48, Line 325: sing_per_th > As the comments in https://github.com/apache/impala/blob/master/fe/src/main My thought as well. Should we revert the default back to True? http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG@346 PS48, Line 346: > Could you attach the bench mark which show effective parallelism improvemen Our single-node benchmark script mainly measure query latency. I don't expect any faster query latency with this patch since the default combination of all new query options and backend flags will actually reduce parallelism in some fragments rather than increasing them. As long as latency does not regress severely compared to regular MT_DOP mode, I take it as a good outcome. The improvement probably best expressed as memory and thread count reduction. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@210 PS43, Line 210: > Would rather keep this as cost as cost of a row is a highly variable metric Changed into min_processing_per_thread in ps49. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@263 PS43, Line 263: // Returns the total estimated size (in bytes) of the row batch queues by > This assume the total cost for a row batch is 1. Is it right estimation? Changed in ps49 to model the cost as 1 per 1KB of average serialized row size. That seems good enough to increase DataStreamSink and ExchangeNode cost. http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/Planner.java@470 PS48, Line 470: ot = postOrderFra > nit: this result seems not used now. Add "TODO" comment Done http://gerrit.cloudera.org:8080/#/c/19033/49/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/49/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@20 PS49, Line 20: com.google.cloud.hadoop.repackaged.gcs.com.google.common.math.LongMath Does not look like the right class to import. http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/ScanNode.java File fe/src/main/java/org/apache/impala/planner/ScanNode.java: http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/ScanNode.java@359 PS48, Line 359: > In ExchangeNode.estimateProcessingCostPerRow(), the cost per row is calcula Changed in ps49 to model the cost as 1 per 1KB of average row size. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 49 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 17 Feb 2023 00:32:47 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#49) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 48: (1 comment) http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/Planner.java@470 PS48, Line 470: blockingAwareCost nit: this result seems not used now. Add "TODO" comment -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 48 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 15 Feb 2023 21:00:49 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 48: (4 comments) http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG@325 PS48, Line 325: IMPALA-2805 As the comments in https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/analysis/Expr.java#L79-L81, it seems relative costs defined in IMPALA-2805 are not accurate. We may need to tune the numbers a little. http://gerrit.cloudera.org:8080/#/c/19033/48//COMMIT_MSG@346 PS48, Line 346: Testing: Could you attach the bench mark which show effective parallelism improvement? http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@263 PS43, Line 263: return deferredBatchQueueSize; > I intended this to be a serialization/deserialization cost per row. This assume the total cost for a row batch is 1. Is it right estimation? http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/ScanNode.java File fe/src/main/java/org/apache/impala/planner/ScanNode.java: http://gerrit.cloudera.org:8080/#/c/19033/48/fe/src/main/java/org/apache/impala/planner/ScanNode.java@359 PS48, Line 359: 1.0f / getRowBatchSize(queryOptions); In ExchangeNode.estimateProcessingCostPerRow(), the cost per row is calculated as 1 / (getRowBatchSize(queryOptions) / avg-row-size). Show we do same? -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 48 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 15 Feb 2023 19:39:52 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 48: (3 comments) http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@138 PS47, Line 138: The costing algorithm splits a query fragment into several segments : divided by blocking PlanNode/DataSink boundary. Each fragment segment is : a subtree of PlanNodes/DataSink in the fragment with a DataSink or > A list implies the linear structure among blocking segments. Not sure it ca That is actually a good point, thank you. Within a plan fragment, there are 2 possible branching point: Join and Union. For Join, the build is on separate fragment making the structure linear within the fragment. But for Union, it is more complicated. I'll think more about this. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@154 PS47, Line 154: 100] > Yes, AGGREGATE of 12 is blocking. There is a significance in treating DataSink equally as PlanNode. When comparing produce-consume rate with the fragment above it, we take the Output Processing cost (segment 4 here) of Producer fragment to compare against Consumer fragment cost. In this example, the whole F03 seems to be slow because of its 3 blocking operator. However, above TOP-N, the row transmission is fast since nothing but DataStreamSink is active serializing and transmitting RowBatch. Simply merging the cost of DataStreamSink will cause the Consumer fragment (fragment above F03) to think that F03 is slow and transmitting little-by-little in steady rate. This can lead to Consumer fragment mistakenly lower its parallelism, thinking that it can consume faster than the Producer below it can send rows. But the truth is that it may spent long time up to completion of TOP-N and then quickly transmitting all the N rows above. The importance of this equal treatment is more apparent in Pre Aggregation and Final Aggregation fragment relationship, say the F00 below this F03 12:AGGREGATE [FINALIZE] | 11:EXCHANGE [HASH(i_class)] | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12 05:AGGREGATE [STREAMING] 05:AGGREGATE may be slow to pre-aggregate. But once it complete, the row transmission by DataStreamSink of F00 is fast. Merging DataStreamSink cost of F00 into 05:AGGREGATE can mistakenly lower parallelism of F03. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@263 PS43, Line 263: return deferredBatchQueueSize; > Still doesn't seem right to divide cost by row batch size. The compute cost I intended this to be a serialization/deserialization cost per row. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 48 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 14 Feb 2023 21:23:35 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 48: (4 comments) http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@213 PS43, Line 213: Effective parallelism of a query is the maximum upper bound of CPU core > We can rework this to be used as a starting count. However, I do think that The upper bound for each fragment should be the number of threads or something close to that. We shouldn't cap it otherwise unless we are seeing specific operators that can't scale linearly and in that case the operator costing can bound further. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc@552 PS43, Line 552: *state->GetFragmentScheduleState(fragment_state->exchange_input_fragments[0]); > Not always. This is the correct assignment if IsExceedMaxFsWriter return fa Done http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@210 PS43, Line 210: > Renamed this to min_input_rows_per_thread. It is now relied on number of in Would rather keep this as cost as cost of a row is a highly variable metric. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@263 PS43, Line 263: return deferredBatchQueueSize; > Changed the estimate cost per row to 1 / row batch size. Still doesn't seem right to divide cost by row batch size. The compute cost per row should be fairly constant. Are you trying to express the network bandwidth and latency? Latency can probably be assumed to be amortized by the row batch and ignored while bandwidth cost will be constant per row. We would need some factor to connect cost units to wall time for bandwidth calculations. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 48 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 14 Feb 2023 19:56:35 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 48: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12375/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 48 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 14 Feb 2023 19:44:29 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 48: (3 comments) http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@138 PS47, Line 138: The costing algorithm splits a query fragment into several segments : divided by blocking PlanNode/DataSink boundary. Each fragment segment is : a subtree of PlanNodes/DataSink in the fragment with a DataSink or > I will rearrange this paragraph and clarify a bit more about "segment" ment A list implies the linear structure among blocking segments. Not sure it can model the graph relationship among them well. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@154 PS47, Line 154: 100] > To me, it should be: Yes, AGGREGATE of 12 is blocking. A sink is like the bottom part of an exchange. Rows flow into it. Therefore it should not be a block segment by itself. Its cost can be included into that of the tree sending data to it. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@258 PS47, Line 258: | > F06 and F05 has JoinBuildSink (a special kind of DataSink that is blocking) Done -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 48 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 14 Feb 2023 19:43:19 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 48: (4 comments) http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@138 PS47, Line 138: The costing algorithm splits a query fragment into several segments : divided by blocking PlanNode/DataSink boundary. Each fragment segment is : a subtree of PlanNodes/DataSink in the fragment with a DataSink or > I will rearrange this paragraph and clarify a bit more about "segment" ment Rearranged this paragraph in ps48. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@142 PS47, Line 142: . P > nit a query Done http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@149 PS47, Line 149: > A fragment without any blocking nodes is called a non-blocking fragment. Done http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@269 PS47, Line 269: > Right, "previous" sounds better. Will update this. Done -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 48 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 14 Feb 2023 19:32:12 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#48) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / row batch size. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = 1 / row batch size; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a fragment. The costing
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 47: (5 comments) http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@258 PS46, Line 258: 03, F04]. : : A CoreRequirement > The problem is with the sum((F05+F02), (F06+F01)) used for HJ04, as it assu On the contrary, I'm thinking the max() in the formula is intended to signify that the probe (subtree rooted at F00) can not run in parallel with the builds (2 subtrees rooted at F05 and F6). The two build fragments, however, can run in parallel (in MT_DOP, they are in separate fragment from the HJ node), hence the sum(). The two build subtrees need 8 cores in total. The probe subtree need 12 cores (dictated by the 00:SCAN that still follow MT_DOP rules). Since probe and builds can not run at the same time, the max(12, 8) is taken as the CoreRequirement for subtree rooted at F00. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@138 PS47, Line 138: A query fragment has a processingCosts_ field, a list of ProcessingCost : associated with the fragment. Each element of processingCosts_ may : solely come from a single PlanNode or sum of adjacent PlanNode costs. > This para probably should be reworded as follows and moved after the para e I will rearrange this paragraph and clarify a bit more about "segment" mentioned here. Now I realize that in the example I added below, the last segment that contains only DataStreamSink that is not a blocking DataSink, therefore it does not really fit the definition of "blocking segment" written here. I should probably just call it "segment" instead of "blocking segment". I don't see the need to add new field totalFragmentProcessingCost_ and blockingSegmentProcessingCosts_ map in the code, since processingCosts_ list is sufficient for iteration in PlanFragment.tryLowerParallelism(). But I understand the confusion of what the list trying to do. Maybe renaming it to segmentProcessingCosts_ will be clearer? Please let me know if it is coherent enough after my rewrite. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@154 PS47, Line 154: 34550429, 2159270, 23752870, 1 > There exist two blocking segments in F03: To me, it should be: There exist four segments in F03: 1. 12(11) 2. 06 3. 08(07) 4. F03sink Therefore we have PC(segment 1) = 2109+34548320 PC(segment 2) = 2159270 PC(segment 3) = 23751970+900 PC(segment 4) = 1 http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@258 PS47, Line 258: (F05, F02), (F06, F01) > These two do not follow the definition of a blocking subtree in that both r F06 and F05 has JoinBuildSink (a special kind of DataSink that is blocking), hence these fragements are blocking. I will update line 252 to mention blocking DataSink as well. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@269 PS47, Line 269: successor > not clear. You mean previous? Right, "previous" sounds better. Will update this. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 47 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 14 Feb 2023 17:43:37 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 47: (8 comments) http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@138 PS46, Line 138: a list of ProcessingCost > The MergeJoin scenario you mention sounds possible. I believe such case can Yeah I agree that the feature in IV that can look at the connected fragments is a plus. However, the same idea can be implemented here too. Thanks a lot for check the MJ case, which is used to illustrate the importance of operator-driven based decision on DoP for blocking operator. Hopefully this can be implemented in step II, III and IV based implementation. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@258 PS46, Line 258: 03, F04]. : : A CoreRequirement > The sequential nature of build-probe is resolved through the CoreRequiremen The problem is with the sum((F05+F02), (F06+F01)) used for HJ04, as it assumes the HJ will build and probe as the same time. Looks like step IV should look into query node (same idea as collapsing into PC computation). http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@138 PS47, Line 138: A query fragment has a processingCosts_ field, a list of ProcessingCost : associated with the fragment. Each element of processingCosts_ may : solely come from a single PlanNode or sum of adjacent PlanNode costs. This para probably should be reworded as follows and moved after the para ending 149, as it is better to introduce the concept of blocking segments first and then define some properties for it. The two new data members are introduced for the query fragment as follows. 1. totalFragmentProcessingCost_ - the total fragment processing cost; 2. blockingSegmentProcessingCosts_ - a map of processing costs for each block segment, keyed on the root node of the blocking segment; Thus for a non-blocking segment, blockingSegmentProcessingCosts_ is empty. For a blocking segment, it has an entry in the map, summarizing the total processing cost for the blocking segment. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@142 PS47, Line 142: the nit a query http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@149 PS47, Line 149: further in step IV. A fragment without any blocking nodes is called a non-blocking fragment. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@154 PS47, Line 154: 34550429, 2159270, 23752870, 1 There exist two blocking segments in F03: 1. 08(07) 2. 06(12(11)) Therefore we have PC(blocking segment 1) = 2159270 + 34548320 + 2109 PC(blocking segment 2) = 900 + 23751970 + 1 http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@258 PS47, Line 258: (F05, F02), (F06, F01) These two do not follow the definition of a blocking subtree in that both root fragment 05 and 06 are not blocking. http://gerrit.cloudera.org:8080/#/c/19033/47//COMMIT_MSG@269 PS47, Line 269: successor not clear. You mean previous? -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 47 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 14 Feb 2023 16:33:35 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 47: (4 comments) http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@149 PS46, Line 149: further in step IV. : : Take an example of the following fragment plan. : : F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 : fragment-costs=[34550429, 2159270, 23752870, 1] : 08:TOP-N [LIMIT=100] : | cost=900 : | : 07:ANALYTIC : | cost=23751970 : | : 06:SORT : | cost=2159270 : | : 12:AGGREGA > nit. Indentation can help readability. Done http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@190 PS46, Line 190: ing algorithm recursively walks, bottom-up, all PlanFragments in : the q > See my comment on the removal of the list of processingCost above. Done http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@206 PS46, Line 206: overlapping between fragment execution and blocking operators. We : compute this in a similar recursive walk as step III and split the query : tree into blocking fragment subtrees similar to step II. The following : is an example of a query plan from TPCDS-Q12. : : F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 : PLAN-ROOT SINK : | : 13:MERGING-EXCHANGE [UNPARTITIONED] : | : F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12) : 08:TOP-N [LIMIT=100] : | : 07:ANALYTIC : | : 06:SORT : | : 12:AGGREGATE [FINALIZE] : | : 11:EXCHANGE [HASH(i_class)] : | : F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12 : 05:AGGREGATE [STREAMING] : | : 04:HASH JOIN [INNER JOIN, BROADCAST] : | : |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 : | JOIN BUILD : | | : | 10:EXCHANGE [BROADCAST] : | | : | F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 : | 02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM] : | : 03:HASH JOIN [INNER JOIN, BROADCAST] : | : |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 : | JOIN BUILD : | | : | 09:EXCHANGE [BROADCAST] > nit indentation. Done http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@253 PS46, Line 253: he query > The order is flipped here. Done -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 47 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 13 Feb 2023 21:18:39 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#47) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / row batch size. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = 1 / row batch size; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a fragment. A query fragment
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 46: (4 comments) http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@138 PS46, Line 138: a list of ProcessingCost > I think a single processingCost (instead of a list of processingCost) may b The MergeJoin scenario you mention sounds possible. I believe such case can be resolved in step IV with careful inspection of fragment members, without the need to merge step II, III, and IV into a single pass. I also think the separation now is better to keep since step II and III focus on individual fragment while step IV reconcile result from step III by looking at connected fragments and reason about their combined parallelism. That being said, I talked with Aman and found out that currently Impala planner does not support SortMerge Join. Impala planner never create a fragment plan with a chain of SCAN -> SORT -> JOIN since the JOIN is either a Hash based or just a plain Nested Loop join. In both cases, sorted input is not needed. I do see EXCHANGE -> AGG -> JOIN within a fragment in some TPC-DS queries (ie., Q47). That is similar with your example, in that AGG is also blocking operator like SORT. But with that case, the current algorithm in step IV is helped by the fact that such fragment always has child fragment feeding into it that is also considered a blocking fragment (has preAGG), leading to correct comparison between root subtree vs child subtrees. For further reassurance, I have tested the current algorithm against TPC-DS scale 3TB. I don't see any major regression due to skewed parallelism from bad decision on step IV. There are 2 queries that regress by more than 10% compared to regular MT_DOP plan, but that is due to wrong cardinality estimates caused by runtime filters and the fact that this patch does not change scan fragment parallelism yet. I plan to improve that in follow up work. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@247 PS46, Line 247: A blocking fragment is a fragment that has a blocking plan node. The : costing algorithm split the query plan tree into several blocking : subtrees divided by blocking fragments. Each blocking subtree has a : blocking fragment as a root and non-blocking fragments as leaves. All : other fragments in the subtree are non-blocking. > This part of the para should be merged into the para at line 142. I can move the Blocking-fragment definition to paragraph at line 142. But I think Blocking-subtree should stay here since discussion in step II is only focus within a single fragment. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@253 PS46, Line 253: F03, F00 The order is flipped here. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@258 PS46, Line 258: This means that all fragments within a : blocking subtree can run in parallel and should be assigned 1 core per : fragment instance > This may overestimate when HJ is involved, since the build side and the pro The sequential nature of build-probe is resolved through the CoreRequirement formula. At F00, the CoreRequirement is resolved to: max(F00, sum((F05+F02), (F06+F01)) = max(12, sum(4, 4)) = 12 The join node subtree (probe) is more dominant than the sum of the child subtrees (both builds). -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 46 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 13 Feb 2023 20:36:09 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 46: (7 comments) Thanks a lot Riza for the update to the commit message, in particular the examples which helps me understanding the step 2, 3 and 4 better. I provided an example to emphasize the importance of integrating the logic in step 2, 3 and 4 into the computation of PC(node). Please let me know if this will fly. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@138 PS46, Line 138: a list of ProcessingCost I think a single processingCost (instead of a list of processingCost) may be sufficient for a fragment, to compute effective DoP. As we walk bottom-up, we compute that for each blocking segment from its children and finally that for the entire fragment. Take exchange(sort(join(sort(scan T1), sort(scan T2))) as example, we can compute 1. PC(sort(scan T1)) = (x, 10) - cost of x, 10 cores 2. PC(sort(scan T2)) = (y, 16) - cost of y, 16 cores 3. PC(join) = (x, 10) + (y, 16) = (x+y, 26), if we want the two sort nodes to work in parallel for a merge join. For hash join, we should take the max on # cores to arrive at (z, 16), since the build side is done first, followed by the probing side. 4. PC(sort(join)) = PC(join) since this top sort can not proceed unless the join below produces all the results. That is, there is no need to allocate extra cores for the this top sort. 5. The # of cores is available: 26 for MJ, 16 for HJ for the entire fragment. Note that in this example, the computation of # of cores takes care of blocking nodes and more importantly the semantics of query nodes. As a bonus, we may be able to get rid of step II, III and IV all together. To make it work efficiently, PC(node) can be extended into (C, N, blocking), where the boolean flag = 1. true when the subtree rooted at node contains a blocking node 2. false otherwise. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@149 PS46, Line 149: F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 : fragment-costs=[34550429, 2159270, 23752870, 1] : 08:TOP-N [LIMIT=100] : | cost=900 : | : 07:ANALYTIC : | cost=23751970 : | : 06:SORT : | cost=2159270 : | : 12:AGGREGATE [FINALIZE] : | cost=34548320 : | : 11:EXCHANGE [HASH(i_class)] :cost=2109 nit. Indentation can help readability. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@185 PS46, Line 185: the query plan tree visiting : PlanFragment from the leaf and going up to the root nit. ,bottom-up, all PlanFragments in the query plan tree. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@190 PS46, Line 190: its adjacent blocking segments (elements in processingCosts_ : list) See my comment on the removal of the list of processingCost above. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@206 PS46, Line 206: F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 : PLAN-ROOT SINK : | : 13:MERGING-EXCHANGE [UNPARTITIONED] : | : F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12) : 08:TOP-N [LIMIT=100] : | : 07:ANALYTIC : | : 06:SORT : | : 12:AGGREGATE [FINALIZE] : | : 11:EXCHANGE [HASH(i_class)] : | : F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12 : 05:AGGREGATE [STREAMING] : | : 04:HASH JOIN [INNER JOIN, BROADCAST] : | : |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 : | JOIN BUILD : | | : | 10:EXCHANGE [BROADCAST] : | | : | F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 : | 02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM] : | : 03:HASH JOIN [INNER JOIN, BROADCAST] : | : |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 : | JOIN BUILD : | | : | 09:EXCHANGE [BROADCAST] : | | : | F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 : | 01:SCAN HDFS [tpcds10_parquet.item, RANDOM] : | : 00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM] nit indentation. http://gerrit.cloudera.org:8080/#/c/19033/46//COMMIT_MSG@247 PS46, Line 247: A blocking fragment is a fragment that has a blocking plan node. The : costing
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 46: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12353/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 46 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 10 Feb 2023 21:51:00 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 46: (12 comments) Thank you Qifan for continuing shepherding this patch. I add some more edits at commit message in patch set 46 and put some examples. Hope it is clearer now. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@144 PS44, Line 144: subtree of PlanNodes/DataSink in the > Each blocking segment is a subtree of the PlanNodes in the fragment with a Done http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@146 PS44, Line 146: PlanNodes or DataSink that belong to the same segment will have their > nit the This is rephrased in ps46. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@154 PS44, Line 154: > nit missing 's' Done http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@158 PS44, Line 158: > May need to further explain, say with the concept of a blocking fragment. The whole section III is rephrased in ps46. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@161 PS44, Line 161: > nit decides This is rephrased in ps46. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@177 PS44, Line 177: fragment's O > nit "At the blocking fragment" is better here as it 'blocking fragment' is Rephrased in ps46. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@177 PS44, Line 177: ngCost is be > same. use of blocking segment is better. Rephrased. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@178 PS44, Line 178: of the parent f > It is better to define CoreRequirement first before use. It is not clear wh Added definition and examples in ps46. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@188 PS44, Line 188: rallelism) of that fragment > Old name. Done http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@195 PS44, Line 195: This > query_cpu_requirement_scale or its new form should be listed here as well. Done http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@200 PS44, Line 200: lle > Do we need a min_threads control? If not, please add a comment here to exp The adjustment can go lower is Consumer fragment is deemed faster than Producer fragment in terms of ProcessingCost. Added clarification in ps46. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@212 PS44, Line 212: : | > for completeness, better to list them here as like the three FE ones. Done -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 46 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 10 Feb 2023 21:41:44 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#46) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / row batch size. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = 1 / row batch size; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; II. Compute the total ProcessingCost of a fragment. A query fragment
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 45: (13 comments) Many thanks for the rework. Added comments for the commit message in this review session, mainly to help me understand the adjustment algorithm better. Plan to do another round review of the adjustment algorithm itself once I get the feedback. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@144 PS44, Line 144: same segment will have their Process Each blocking segment is a subtree of the PlanNodes in the fragment with a blocking root and blocking/non-blocking leaves. All other nodes in the subtree are non-blocking. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@146 PS44, Line 146: called Input ProcessingCost, while the last cost in nit the http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@154 PS44, Line 154: t.co nit missing 's' http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@158 PS44, Line 158: May need to further explain, say with the concept of a blocking fragment. It is not clear to me which fragment is adjusted. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@161 PS44, Line 161: just the nit decides http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@177 PS44, Line 177: meaning that nit "At the blocking fragment" is better here as it 'blocking fragment' is well defined. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@177 PS44, Line 177: s can run in same. use of blocking segment is better. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@178 PS44, Line 178: core per fragme It is better to define CoreRequirement first before use. It is not clear whether it is for a fragment or for a node. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@188 PS44, Line 188: pared against the total CPU Old name. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@195 PS44, Line 195:Co query_cpu_requirement_scale or its new form should be listed here as well. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@200 PS44, Line 200: m i Do we need a min_threads control? If not, please add a comment here to explain the adjustment is always increasing. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@212 PS44, Line 212: : for completeness, better to list them here as like the three FE ones. http://gerrit.cloudera.org:8080/#/c/19033/44/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test File testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test: http://gerrit.cloudera.org:8080/#/c/19033/44/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test@75 PS44, Line 75: ss_sold_date_sk = > Done. Add max-parallelism and fragment-costs as well. Nice. Thanks! -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 45 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 10 Feb 2023 16:24:56 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 45: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12349/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 45 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 10 Feb 2023 05:37:59 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 45: (11 comments) http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@16 PS44, Line 16: then finally returns a > May need to explain why blocking operators are considered in the context of Removed this phase here and put more explanation in steps II and III. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@19 PS44, Line 19: > nit found Done http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc@201 PS37, Line 201: 2 w > The scaling factor as defined is less intuitive, since one has to inverse i Renamed to query_cpu_requirement_divisor. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@147 PS37, Line 147: Costs_ = null; : private List > nit. A positive value implies the instance count has been adjusted. Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@864 PS37, Line 864: r(Predicates.i > Better renamed as getMaxParallelismByTotalWorkSize(). Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@883 PS37, Line 883:Preconditions.checkState(p.getChildCount() > 1); : long buildRowCount = p.getChild(1).getCardinality(); : if (((JoinNode) p).getDistributionMode() == DistributionMode.BROADCAST) { : // For Broadcast join, all join receive the same work size from the build. : buildRowCount = buildRowCount * p.getNumInstances(); : } : > Repeated use from line 869. Can be refactored. Removed. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@957 PS37, Line 957: protected int getAdjustedInstanceCount() { return adjustedInstanceCount_; } > Should add a comment. Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@972 PS37, Line 972: ngBuilder builder = new StringBuilder(); : for (int i = processingCosts_.size() - 1; i >= 0; i--) > I wonder if the computation can be improved here e.g. by the size of the wo In newer patch set, this is later lowered based on producer-consumer rate ratio. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1098 PS37, Line 1098: Math.max(nodeStepCount, getMaxParallelismByTotalWorkSize()); > add a comment should be helpful. Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1113 PS37, Line 1113: > does not sound right. Removed. http://gerrit.cloudera.org:8080/#/c/19033/44/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test File testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test: http://gerrit.cloudera.org:8080/#/c/19033/44/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test@75 PS44, Line 75: ss_sold_date_sk = > Is it possible to show the new processing cost here too? It will be wonderf Done. Add max-parallelism and fragment-costs as well. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 45 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 10 Feb 2023 05:28:40 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#45) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be found in the four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / row batch size. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = 1 / row batch size; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follow the same formula; II. Compute the total ProcessingCost of a fragment. A query fragment
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 44: (11 comments) http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@16 PS44, Line 16: blocking-operator nature May need to explain why blocking operators are considered in the context of DoP adjustment. Conceptually, blocking operators can be divided into multiple tiers with those in the 1st are close to the leaf nodes and can run right away. Those in the 2nd tier can run when all their dependent children (including the blocking operators in 1st tier) can provide data. I did not see a linkage to increase DoP for blocking operators. http://gerrit.cloudera.org:8080/#/c/19033/44//COMMIT_MSG@19 PS44, Line 19: explained nit found http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc@201 PS37, Line 201: 0.5 > My intention is to associate the scaling flag to CPU requirement of the que The scaling factor as defined is less intuitive, since one has to inverse it to understand its semantics. I think you can define the true scaling factor to be = 1 / . http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@147 PS37, Line 147: have an adjusted number of instance based on : // Processin nit. A positive value implies the instance count has been adjusted. It is also nice to provide an example here. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@864 PS37, Line 864: elism, numNode Better renamed as getMaxParallelismByTotalWorkSize(). http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@883 PS37, Line 883: : protected boolean hasAdjustedInstanceCount() { return adjustedInstanceCount_ > 0; } : : protected void setFixedInstanceCount(int count) { : isFixedParallelism_ = true; : setAdjustedInstanceCount(count); : } Repeated use from line 869. Can be refactored. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@957 PS37, Line 957: processingCosts_.get(index).getNumInstanceExpected()); Should add a comment. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@972 PS37, Line 972: : // Compute exchanging child parallelism first. I wonder if the computation can be improved here e.g. by the size of the work. Making it to the max # of nodes can overuse the system resource. In general, I wonder if this logic tries to fix some bugs in DoP computation. Adjusting DoP specifically for plans with blocking operators seems odd. See my comment to the commit message. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1098 PS37, Line 1098: if (hasBlockingNode()) { add a comment should be helpful. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1113 PS37, Line 1113: does not sound right. http://gerrit.cloudera.org:8080/#/c/19033/44/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test File testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test: http://gerrit.cloudera.org:8080/#/c/19033/44/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test@75 PS44, Line 75: cardinality=3.04K Is it possible to show the new processing cost here too? It will be wonderful. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 44 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 08 Feb 2023 15:54:25 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 44: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12332/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 44 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Feb 2023 23:55:53 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 44: (19 comments) http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@209 PS43, Line 209: > Should use THREADS instead of INSTANCES in naming if this is sizing local t Done http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@213 PS43, Line 213: backend-gflag-util.cc. > We can rework this to be used as a starting count. However, I do think that Done http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@244 PS43, Line 244: > Add Done http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc@270 PS43, Line 270: if (ContainsUnionNode(fragment_state->fragment.plan) > Add comment/TODO to explain why these cases are skipped. Done http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/service/query-options.cc File be/src/service/query-options.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/service/query-options.cc@1071 PS43, Line 1071: if (result != StringParser::PARSE_SUCCESS || max_num < 1 || max_num > 128) { > Change INSTANCES to THREADS Done http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/service/query-options.cc@1077 PS43, Line 1077: break; > 64 may be tool low of a limit. 96 and 128 core CPUs are becoming more commo Done http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@203 PS43, Line 203: "Valid value is > 0.0 and <= 10.0. Default value is 1."); > The '(' sign before 0.0 meant to be an exclusion, while ']' sign after 10.0 Done http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@205 PS43, Line 205: DEFINE_bool_hidden(processing_cost_equal_expr_weight, false, > This default to true because I have not really explore if evaluation costin I don't see major changes from setting this to false. Changed the default to false. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@210 PS43, Line 210: // TODO: Benchmark and tune this config with an optimal value. > The end goal of this flag it to cap maximum number of instances by requirin Renamed this to min_input_rows_per_thread. It is now relied on number of input rows (number of rows coming in from exchange + scan + join build) rather than ProcessingCost unit. http://gerrit.cloudera.org:8080/#/c/19033/38/common/thrift/Frontend.thrift File common/thrift/Frontend.thrift: http://gerrit.cloudera.org:8080/#/c/19033/38/common/thrift/Frontend.thrift@769 PS38, Line 769: // The optional max_mem_limit to determine which executor group set to run for a query. > This comment seems out of place? Restored the old comment in newer patch set. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/AggregationNode.java File fe/src/main/java/org/apache/impala/planner/AggregationNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/AggregationNode.java@85 PS43, Line 85: > Make this a local variable instead. Done http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java File fe/src/main/java/org/apache/impala/planner/CoreRequirement.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@59 PS43, Line 59: total_ = counts_.stream().mapToInt(v -> v).sum(); > Will replace it with regular stream. Done http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@89 PS43, Line 89: protected static CoreRequirement sum(List cores) { > Will change to protected. Done http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@263 PS43, Line 263: return deferredBatchQueueSize; > Any formula suggestion to replace this? If it is a fixed per-row cost, then Changed the estimate cost per row to 1 / row batch size. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@273 PS43, Line 273: st.setNumRowToProdu > This seems to be flawed for case of preaggregation streaming node. The impl Done
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#44) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / row batch size. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = 1 / row batch size; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 29. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follow the same formula; II. Compute the
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 43: (11 comments) Hi Kurt, thank you for your review. I'm replying some for further discussion. http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@213 PS43, Line 213:option. Setting 0 means MT_DOP will be used to count the maximum > Need to rework these options so that the core count for the instance is the We can rework this to be used as a starting count. However, I do think that a hard cap config is still required to prevent excessively high number of instances being scheduled due to stats inaccuracy. We can use the min_processing_cost_per_thread flag to derive the max cap once we figure out a good default value to do so. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc@552 PS43, Line 552: int max_instances = input_fragment_state.instance_states.size(); > This assignment is always overwritten. Is the value meaningful? Not always. This is the correct assignment if IsExceedMaxFsWriter return false and COMPUTE_PROCESSING_COST=0. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@203 PS43, Line 203: "Valid value is between (0.0, 10.0]. Default value is 1."); > Suggest >0 lower limit. The '(' sign before 0.0 meant to be an exclusion, while ']' sign after 10.0 meant as inclusion. Will spell it out in next patch set. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@205 PS43, Line 205: DEFINE_bool_hidden(processing_cost_equal_expr_weight, true, > Why is the default true here? Also, are the width and datatype considered s This default to true because I have not really explore if evaluation costing from IMPALA-2805 is a good fit for CPU costing algorithm as well. It was originally added to determine filtering priority order. I can try test setting it false and see how it goes. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@210 PS43, Line 210: DEFINE_int64_hidden(min_processing_cost_per_thread, 100, > This may not be aggressive enough of a default. Is this number backed by me The end goal of this flag it to cap maximum number of instances by requiring each instance to at least work this amount of ProcessingCost unit. With this value in tpcds_3000_parquet, I can see some fragment still has high max cap (high hundreds and low thousands), while num executors is only 10 and MT_DOP=12. Any scale down (lower than MT_DOP) that happen was mostly driven by the consume-produce rate comparison. So I'd argue to increase this value to further lower the high max cap. On the other hand, ProcessingCost is not exposed to user anywhere other than EXPLAIN_LEVEL=VERBOSE. This is will confuse user when they need to tune it. I'm thinking of changing the flag based on input row cardinality instead. I think that will be easier to reason and tune since the input row cardinality is shown in query profile. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java File fe/src/main/java/org/apache/impala/planner/CoreRequirement.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@59 PS43, Line 59: total_ = counts_.parallelStream().mapToInt(v -> v).sum(); > Is this List really bid enough for parallelStream? Will replace it with regular stream. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@89 PS43, Line 89: public static CoreRequirement sum(List cores) { > Exposing different aggregate values here might blur the usage of this objec Will change to protected. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@263 PS43, Line 263:* Estimate per-row cost as 1 / num rows per batch. > Cost seems too low here. Likely more fixed per-row cost. Any formula suggestion to replace this? If it is a fixed per-row cost, then average row size does not matter? http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java File fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java@384
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 43: (20 comments) http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@209 PS43, Line 209: 3. PROCESSING_COST_MAX_INSTANCES Should use THREADS instead of INSTANCES in naming if this is sizing local to an impalad instance. http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@213 PS43, Line 213:option. Setting 0 means MT_DOP will be used to count the maximum Need to rework these options so that the core count for the instance is the default limit and that can be overridden higher or lower. There isn't much point of preserving mt_dop values when compute_processing_cost is set as the sizing is using a different model. http://gerrit.cloudera.org:8080/#/c/19033/43//COMMIT_MSG@244 PS43, Line 244: Add Co-authored-by: Riza.. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc@270 PS43, Line 270: if (ContainsUnionNode(fragment_state->fragment.plan) Add comment/TODO to explain why these cases are skipped. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/scheduling/scheduler.cc@552 PS43, Line 552: int max_instances = input_fragment_state.instance_states.size(); This assignment is always overwritten. Is the value meaningful? http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/service/query-options.cc File be/src/service/query-options.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/service/query-options.cc@1071 PS43, Line 1071: case TImpalaQueryOptions::PROCESSING_COST_MAX_INSTANCES: { Change INSTANCES to THREADS http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/service/query-options.cc@1077 PS43, Line 1077:"Valid values are in [0, 64].", 64 may be tool low of a limit. 96 and 128 core CPUs are becoming more common. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@203 PS43, Line 203: "Valid value is between (0.0, 10.0]. Default value is 1."); Suggest >0 lower limit. http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@205 PS43, Line 205: DEFINE_bool_hidden(processing_cost_equal_expr_weight, true, Why is the default true here? Also, are the width and datatype considered separately? http://gerrit.cloudera.org:8080/#/c/19033/43/be/src/util/backend-gflag-util.cc@210 PS43, Line 210: DEFINE_int64_hidden(min_processing_cost_per_thread, 100, This may not be aggressive enough of a default. Is this number backed by measured results? http://gerrit.cloudera.org:8080/#/c/19033/38/common/thrift/Frontend.thrift File common/thrift/Frontend.thrift: http://gerrit.cloudera.org:8080/#/c/19033/38/common/thrift/Frontend.thrift@769 PS38, Line 769: // The optional max_mem_limit to determine which executor group set to run for a query. This comment seems out of place? http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/AggregationNode.java File fe/src/main/java/org/apache/impala/planner/AggregationNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/AggregationNode.java@85 PS43, Line 85: private List processingCosts_; Make this a local variable instead. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java File fe/src/main/java/org/apache/impala/planner/CoreRequirement.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@59 PS43, Line 59: total_ = counts_.parallelStream().mapToInt(v -> v).sum(); Is this List really bid enough for parallelStream? http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@89 PS43, Line 89: public static CoreRequirement sum(List cores) { Exposing different aggregate values here might blur the usage of this object. Do these interfaces all need to be public? http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@263 PS43, Line 263:* Estimate per-row cost as 1 / num rows per batch. Cost seems too low here. Likely more fixed per-row cost. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java File
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 43: (3 comments) http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@211 PS42, Line 211: um processing cost that a frag > Maybe add a TODO to set a range for this parameter to avoid unexpected beha Ack http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@273 PS43, Line 273: root.isBlockingNode This seems to be flawed for case of preaggregation streaming node. The implementation of isBlockingNode() in preaggregate streaming node returns false. But during execution, when the preaggragation managed to aggregate until the end (no rows passed through), CPU costing algorithm should really consider it as a blocking node. Doing so will make costing algorithm set higher instance count for query fragment right above it (which contains final aggregation node). I confirmed this by setting DISABLE_STREAMING_PREAGGREGATIONS=1 when running TPCDS Q78 and Q79. With streaming preaggregation disabled, the fragment containing final aggregation is scheduled with higher instance count. http://gerrit.cloudera.org:8080/#/c/19033/43/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@692 PS43, Line 692: builder.append("work-size="); : builder.append(getTotalWorkSize()); This is now redundant with fragment's input cardinality and the sizing is based on min_processing_cost_per_thread instead of input row count. This can be removed. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 43 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 07 Feb 2023 00:32:29 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 43: (1 comment) http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@211 PS42, Line 211: um processing cost that a frag > This is probably subject to tuning. The original intent is to assign "minim Maybe add a TODO to set a range for this parameter to avoid unexpected behavior. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 43 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Feb 2023 23:10:25 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 43: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12318/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 43 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Feb 2023 23:09:15 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 43: (10 comments) http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/scheduling/scheduler.h File be/src/scheduling/scheduler.h: http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/scheduling/scheduler.h@441 PS42, Line 441: TPlanFragment.effective_ > nit: TPlanFragment.effective_instance_count. Otherwise, it's hard to find c Done http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@206 PS42, Line 206: 1 > These are relative costs, right? "1" is the minimum cost? Yes, relative cost. 1 is the minimum. http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@207 PS42, Line 207: p > nit: don't break line here Done http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@211 PS42, Line 211: um processing cost that a frag > what's the normal range? why set default value as 100? This is probably subject to tuning. The original intent is to assign "minimum work" per thread, that is 1 million rows per thread/fragment instance. This "min_processing_cost_per_thread" is roughly scale linearly towards num rows per thread, but with the weight factored in as well (C & M explained in the commit message). I did tests against tpcds_3000_parquet (10 nodes, mt_dop=12) and this config value seem to work fine for most queries. Reducing this value will cause planner to schedule more fragment instance, while increasing it will reduce fragment instance. http://gerrit.cloudera.org:8080/#/c/19033/42/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/19033/42/common/thrift/ImpalaService.thrift@767 PS42, Line 767: Allow CPU costing algorithm to schedule fragment instance count higher > The comment is different from the meaning of query option Fixed. http://gerrit.cloudera.org:8080/#/c/19033/42/common/thrift/ImpalaService.thrift@771 PS42, Line 771: q > Why the max value is 64? This follow the scale of MT_DOP. http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@47 PS42, Line 47: childProcessingCost_ > Could we move this Precondition.checkState to constructor? I look the code again and think that multiple_ is redundant with numInstanceSupplier_ from the base class. I refactored the class accordingly and move the Precondition to ProcessingCost.setNumInstanceExpected(). http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/DataSink.java File fe/src/main/java/org/apache/impala/planner/DataSink.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/DataSink.java@68 PS42, Line 68: checkState(proc > Should we add Preconditions to check if processingCost_ is valid? Done http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@265 PS42, Line 265: per row. > per row? Done http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/util/ExprUtil.java File fe/src/main/java/org/apache/impala/util/ExprUtil.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/util/ExprUtil.java@119 PS42, Line 119: e.getCost() : 1 > What's cost range? Is the value 1 the minimum value? It is between 1 to 10. IMPALA-2805 add these costs at https://github.com/apache/impala/blob/40da36414ff4d46b5cdc53f068b1f0a5b28a0f1d/fe/src/main/java/org/apache/impala/analysis/Expr.java#L79-L94 -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 43 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Feb 2023 22:56:32 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#43) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 42: (10 comments) http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/scheduling/scheduler.h File be/src/scheduling/scheduler.h: http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/scheduling/scheduler.h@441 PS42, Line 441: effective_instance_count nit: TPlanFragment.effective_instance_count. Otherwise, it's hard to find context here. http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@206 PS42, Line 206: 1 These are relative costs, right? "1" is the minimum cost? http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@207 PS42, Line 207: " nit: don't break line here http://gerrit.cloudera.org:8080/#/c/19033/42/be/src/util/backend-gflag-util.cc@211 PS42, Line 211: min_processing_cost_per_thread what's the normal range? why set default value as 100? http://gerrit.cloudera.org:8080/#/c/19033/42/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/19033/42/common/thrift/ImpalaService.thrift@767 PS42, Line 767: Control whether to display processing cost detail in query plan or not. The comment is different from the meaning of query option http://gerrit.cloudera.org:8080/#/c/19033/42/common/thrift/ImpalaService.thrift@771 PS42, Line 771: 64 Why the max value is 64? http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@47 PS42, Line 47: Preconditions.checkState Could we move this Precondition.checkState to constructor? http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/DataSink.java File fe/src/main/java/org/apache/impala/planner/DataSink.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/DataSink.java@68 PS42, Line 68: processingCost_ Should we add Preconditions to check if processingCost_ is valid? http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java File fe/src/main/java/org/apache/impala/planner/ExchangeNode.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java@265 PS42, Line 265: per row batch per row? http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/util/ExprUtil.java File fe/src/main/java/org/apache/impala/util/ExprUtil.java: http://gerrit.cloudera.org:8080/#/c/19033/42/fe/src/main/java/org/apache/impala/util/ExprUtil.java@119 PS42, Line 119: e.getCost() : 1 What's cost range? Is the value 1 the minimum value? -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 42 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 06 Feb 2023 19:12:54 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 42: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12312/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 42 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sat, 04 Feb 2023 03:48:04 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 42: (7 comments) http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Frontend.thrift File common/thrift/Frontend.thrift: http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Frontend.thrift@769 PS41, Line 769: // The optional max_mem_limit to determine which executor group set to run for a query. : // The max_mem_limit value is set to the max_query_mem_limit attribute of the group set : // with name prefix 'exec_group_name_prefix' from the pool service. For each query, : // the frontend computes the per host estimated-memory after a compilation with a : // number of executor nodes from this group set and compares it with this variable. : 4: optional i64 max_mem_limit : : // The optional num_cores_per_executor is used to determine which > Restore the comment. Done http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Query.thrift File common/thrift/Query.thrift: http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Query.thrift@912 PS41, Line 912: > This need comment. Done http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Query.thrift@914 PS41, Line 914: > Consider dropping this and put it to LOG instead. Done http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@150 PS41, Line 150: adjustedInstanceCount_ > This can be set multiple time during effective parallelism computation. Added isFixedParallelism_. http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@953 PS41, Line 953:"ProcessingCost Fragment " + getId() + " has not been computed!"); : int nodeStepCount = getNumInstances() % getNumNodes() == 0 ? getNumNodes() : 1; : boolean canTryLower = adjus > I believe effective parallelism adjustment and processing cost finding can Done http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/Planner.java@317 PS41, Line 317: : hasHeader = true; : } > This might still be useful to show in EXTENDED explain level. Done http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/Planner.java@320 PS41, Line 320: arn if the planner is running in DEBUG mode. : if (request.query_ctx.client_request.query_options.planner_testcase_mode) { : str.append("WARNING > move to LOG instead. Done -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 42 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Sat, 04 Feb 2023 03:43:38 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#42) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 41: (7 comments) Found some issue and potential improvement. http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Frontend.thrift File common/thrift/Frontend.thrift: http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Frontend.thrift@769 PS41, Line 769: // The following is a list of limits to help selecting a suitable executor group to : // run for a query. The group is identified by the name prefix 'exec_group_name_prefix' : // from the pool service. For each query, the frontend computes a value for each of : // the limit dimension and compares it with the limit. An executor group is chosen : // if all such values are equal to or less than their corresponding limits defined : // for the group. : : // The memory limit provides the per host estimated-memory limit. Restore the comment. http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Query.thrift File common/thrift/Query.thrift: http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Query.thrift@912 PS41, Line 912: optional i32 cores_required; This need comment. http://gerrit.cloudera.org:8080/#/c/19033/41/common/thrift/Query.thrift@914 PS41, Line 914: optional string debug_core_sizing_trace; Consider dropping this and put it to LOG instead. http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@150 PS41, Line 150: adjustedInstanceCount_ This can be set multiple time during effective parallelism computation. I think we should have separate boolean flag to mark if certain fragment is set to a fixed count by some plan node specification (leaf node such as scan, empty source node, shared join build, etc). http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@953 PS41, Line 953: protected ProcessingCost traverseBlockingAwareCost(int maxInstancesPerNode, : List blockingTreeCosts, int parentInstanceCount, : boolean allowIncrement) { I believe effective parallelism adjustment and processing cost finding can be separated into their own function. I'll look into this. http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/Planner.java@317 PS41, Line 317: str.append("Efficient parallelism: "); : str.append(request.getCores_required()); : str.append("\n"); This might still be useful to show in EXTENDED explain level. http://gerrit.cloudera.org:8080/#/c/19033/41/fe/src/main/java/org/apache/impala/planner/Planner.java@320 PS41, Line 320: str.append("Efficient parallelism trace: "); : str.append(request.getDebug_core_sizing_trace()); : str.append("\n"); move to LOG instead. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 41 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 03 Feb 2023 17:08:16 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 41: (1 comment) ps40 fix bug in CreateInputCollocatedInstances and logic to determine if cpu requirement is satisfied. http://gerrit.cloudera.org:8080/#/c/19033/39//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/39//COMMIT_MSG@248 PS39, Line 248: dinator log > typo: PlannerTest Done -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 41 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Thu, 02 Feb 2023 19:07:46 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 40: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12297/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 40 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Thu, 02 Feb 2023 19:11:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#41) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#40) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 39: (1 comment) ps39 add PlannerTest#testProcessingCost and change the default value of query_cpu_requirement_scale to 1. http://gerrit.cloudera.org:8080/#/c/19033/39//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19033/39//COMMIT_MSG@248 PS39, Line 248: PlanenrTest typo: PlannerTest -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 39 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 01 Feb 2023 22:45:11 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 39: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12290/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 39 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Wed, 01 Feb 2023 22:40:46 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#39) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 38: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12278/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 38 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 31 Jan 2023 21:27:35 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 38: (17 comments) Thank you for your review, Qifan! http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/scheduling/scheduler.cc@266 PS37, Line 266: cheduler::CheckEffective > nit. is positive Done. Moved to scheduler.h. http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/scheduling/scheduler.cc@272 PS37, Line 272: || IsExceedMaxFsWriters(fragment_state, state)) { > Is it possible to exclude the checking for scan fragment and certain table Yes! Consolidated the case checking in this method. http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc@201 PS37, Line 201: 0.5 > Just wonder why not directly use a value of 2 here. My intention is to associate the scaling flag to CPU requirement of the query, not the total CPU cores of executor group set. Hence, this means halving the CpuRequirement returned by costing algorithm (oversubscribing the executor group). http://gerrit.cloudera.org:8080/#/c/19033/37/common/thrift/Planner.thrift File common/thrift/Planner.thrift: http://gerrit.cloudera.org:8080/#/c/19033/37/common/thrift/Planner.thrift@87 PS37, Line 87: // > Miss the comment here Added comment. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/analysis/Expr.java File fe/src/main/java/org/apache/impala/analysis/Expr.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/analysis/Expr.java@473 PS37, Line 473: public long getNumDistinctValues() { return n > This is unused. However, this point me to existing evalCost_ added in IMPAL Removed. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java File fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java@364 PS37, Line 364: 'partitionByEq_' and 'orderBy > This does not match with the code at line 368: ExprUtil.computeExprCost(par Removed partitionByEq_ from cost calculation. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java@365 PS37, Line 365: titioned and sorte > This does not match with the code at line 368 ExprUtil.computeExprCost(orde Removed orderByEq_ from cost calculation. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@33 PS37, Line 33: childP > May use the name childProcessingCost_ or add a comment to indicate this is Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@47 PS37, Line 47: onditions.checkState( > Code duplication with line at 40. Replaced preconditions in line 40 with single call to getMultiple() http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java File fe/src/main/java/org/apache/impala/planner/CoreRequirement.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@29 PS37, Line 29: > Comment is missing. Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@31 PS37, Line 31: * A container class that represent CPU core requirement of certain subtree of a query or : * the query itself. : */ > A comment for each data member. Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java File fe/src/main/java/org/apache/impala/planner/DataSink.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java@49 PS37, Line 49: Set in computeProc > nit Set in computeProcessingCost() for a meaningful value. Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java@131 PS37, Line 131: ws p > nit. data fields in Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java@141 PS37, Line 141: setNumRowToProduce(Mat > This is missing in the comment at line 131 above. Done http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java File
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#38) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise.
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 37: (14 comments) Looks good! http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/scheduling/scheduler.cc@266 PS37, Line 266: effective_instance_count nit. is positive http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/scheduling/scheduler.cc@272 PS37, Line 272: if (effective_instance_count > 0) { Is it possible to exclude the checking for scan fragment and certain table sink fragment here? http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc File be/src/util/backend-gflag-util.cc: http://gerrit.cloudera.org:8080/#/c/19033/37/be/src/util/backend-gflag-util.cc@201 PS37, Line 201: 0.5 Just wonder why not directly use a value of 2 here. http://gerrit.cloudera.org:8080/#/c/19033/37/common/thrift/Planner.thrift File common/thrift/Planner.thrift: http://gerrit.cloudera.org:8080/#/c/19033/37/common/thrift/Planner.thrift@87 PS37, Line 87: 14: Miss the comment here http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java File fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java@364 PS37, Line 364: 'partitionExprs_' is excluded This does not match with the code at line 368: ExprUtil.computeExprCost(partitionByEq_) http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java@365 PS37, Line 365: sorted within each This does not match with the code at line 368 ExprUtil.computeExprCost(orderByEq_) http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java File fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@33 PS37, Line 33: cost_; May use the name childProcessingCost_ or add a comment to indicate this is the cost from the child. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java@47 PS37, Line 47: multiple_.get() > 0, "BroadcastProcessingCost: multiple must be greater than 0!"); Code duplication with line at 40. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java File fe/src/main/java/org/apache/impala/planner/CoreRequirement.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@29 PS37, Line 29: Comment is missing. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/CoreRequirement.java@31 PS37, Line 31: private final List ids_; : private final List counts_; : private int total_ = -1; A comment for each data member. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java File fe/src/main/java/org/apache/impala/planner/DataSink.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java@49 PS37, Line 49: Gets set correctly nit Set in computeProcessingCost() for a meaningful value. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java@131 PS37, Line 131: into nit. data fields in http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/DataSink.java@141 PS37, Line 141: setNumInstanceExpected This is missing in the comment at line 131 above. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java File fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/MultipleProcessingCost.java@26 PS37, Line 26: multiple_ multiplier_ is better. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 37 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Tue, 31 Jan 2023 17:29:32 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Qifan Chen has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 37: Will review when I have a chance this week. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 37 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 30 Jan 2023 23:59:38 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 37: (3 comments) http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/analysis/Expr.java File fe/src/main/java/org/apache/impala/analysis/Expr.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/analysis/Expr.java@473 PS37, Line 473: public float evalCost() { return evalCost_; } This is unused. However, this point me to existing evalCost_ added in IMPALA-2805. We should experiment using that evalCost_ weighting vs equal cost of 1 per expression. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1025 PS37, Line 1025: childCostChange = adjustNonSharedJoinBuildChildCount(); I'm having a second thought on this join-build recomputation on non-shared join build (PARTITIONED). Since it is PARTITIONED, changing instance count of the join fragment will not change the output cardinality of join-build fragment. Hence, there will be no change in total processing cost of the exchange node, and recomputation is not necessary. http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java File fe/src/main/java/org/apache/impala/planner/ProcessingCost.java: http://gerrit.cloudera.org:8080/#/c/19033/37/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java@40 PS37, Line 40: public final static long MIN_COST_PER_THREAD = 100L; This should be a backend flag instead of constant so that tuning is possible. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 37 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 30 Jan 2023 20:25:12 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 37: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12263/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 37 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 30 Jan 2023 17:42:13 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 37: ps37 is a rebase of ps36 -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 37 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Mon, 30 Jan 2023 17:23:09 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#37) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions, partition-by-equal, and order-by-equal predicate; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 36: ps36 adds the following: - CheckEffectiveInstanceCount in scheduler.cc to verify that backend is scheduling fragment instances less than or equal to effective_instance_count calculated by planner. - Add query_cpu_requirement_scale as a config to scale down/up CPU requirement in case costing algorithm overestimate/underestimate and cause regression. - Break down traverseAwareCost() into several function and do recomputation for some cases. - Add CPU matching logic in Frontend.java -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 36 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 27 Jan 2023 20:31:56 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. Patch Set 36: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/12248/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/19033 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If32dc770dfffcdd0be2ba789a7720952c68a Gerrit-Change-Number: 19033 Gerrit-PatchSet: 36 Gerrit-Owner: Qifan Chen Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Qifan Chen Gerrit-Reviewer: Riza Suminto Gerrit-Reviewer: Wenzhe Zhou Gerrit-Comment-Date: Fri, 27 Jan 2023 20:20:25 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-11604 Planner changes for CPU usage
Riza Suminto has uploaded a new patch set (#36) to the change originally created by Qifan Chen. ( http://gerrit.cloudera.org:8080/19033 ) Change subject: IMPALA-11604 Planner changes for CPU usage .. IMPALA-11604 Planner changes for CPU usage This patch augments IMPALA-10992 by establishing an infrastructure to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. At the basis of the CPU costing model, we define ProcessingCost as a cost for a distinct PlanNode / DataSink / PlanFragment to process its input rows globally across all of its instances. The costing algorithm then tries to adjust the number of instances for each fragment by considering their production-consumption ratio and blocking-operator nature between their plan nodes, and finally then returns a number representing an ideal CPU core count required for a query to run efficiently. A more detailed explanation of the CPU costing algorithm can be explained in four steps below. I. Compute ProcessingCost for each plan node and data sink. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / MIN_COST_PER_THREAD (1 million), but is fixed for a certain node/sink and adjustable in step III. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions, partition-by-equal, and order-by-equal predicate; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = 1 / num rows per batch. A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode: Follow the formula from the superclass ScanNode; 09. HdfsScanNode and KuduScanNode: Follow the formula from the superclass ScanNode with modified N. N is mt_dop when query option mt_dop >= 1, otherwise N is the number of nodes * max scan threads; 10. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 11. ScanNode: M = average row size / ROWBATCH_MAX_MEM_USAGE (8MB); 12. SelectNode: Use the general formula; 13. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 14. SortNode: C is the evaluation cost for the sort expression; 15. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 16. Union node: C is the cost of result expression evaluation from all non-pass-through children; 17. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 18. DataStreamSink: M = 1 / num rows per batch. 19. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 20. PlanRootSink: If result spooling is enabled, C is the cost of