[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-08 Thread Riza Suminto (Code Review)
Riza Suminto has submitted this change and it was merged. ( 
http://gerrit.cloudera.org:8080/19593 )

Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..

IMPALA-11604 (part 2): Compute Effective Parallelism of Query

Part 1 of IMPALA-11604 implements the ProcessingCost model for each
PlanNode and DataSink. This second part builds on top of ProcessingCost
model by adjusting the number of instances for each fragment after
considering their production-consumption ratio, and then finally returns
a number representing an ideal CPU core count required for a query to
run efficiently. A more detailed explanation of the CPU costing
algorithm can be found in the three steps below.

I. Compute the total ProcessingCost of a fragment.

The costing algorithm splits a query fragment into several segments
divided by blocking PlanNode/DataSink boundary. Each fragment segment is
a subtree of PlanNodes/DataSink in the fragment with a DataSink or
blocking PlanNode as root and non-blocking leaves. All other nodes in
the segment are non-blocking. PlanNodes or DataSink that belong to the
same segment will have their ProcessingCost summed. A new CostingSegment
class is added to represent this segment.

A fragment that has a blocking PlanNode or blocking DataSink is called a
blocking fragment. Currently, only JoinBuildSink is considered as
blocking DataSink. A fragment without any blocking nodes is called a
non-blocking fragment. Step III discuss further about blocking and
non-blocking fragment.

Take an example of the following fragment plant, which is blocking since
it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N.

  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12)
  fragment-costs=[34974657, 2159270, 23752870, 22]
  08:TOP-N [LIMIT=100]
  |  cost=900
  |
  07:ANALYTIC
  |  cost=23751970
  |
  06:SORT
  |  cost=2159270
  |
  12:AGGREGATE [FINALIZE]
  |  cost=34548320
  |
  11:EXCHANGE [HASH(i_class)]
 cost=426337

In bottom-up direction, there exist four segments in F03:
  Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE)
  Blocking segment 2: 06:SORT
  Blocking segment 3: (07:ANALYTIC, 08:TOP-N)
  Non-blocking segment 4: DataStreamSink of F03

Therefore we have:
  PC(segment 1) = 426337+34548320
  PC(segment 2) = 2159270
  PC(segment 3) = 23751970+900
  PC(segment 4) = 22

These per-segment costs stored in a CostingSegment tree rooted at
PlanFragment.rootSegment_, and are [34974657, 2159270, 23752870, 22]
respectively after the post-order traversal.

This is implemented in PlanFragment.computeCostingSegment() and
PlanFragment.collectCostingSegmentHelper().

II. Compute the effective degree of parallelism (EDoP) of fragments.

The costing algorithm walks PlanFragments of the query plan tree in
post-order traversal. Upon visiting a PlanFragment, the costing
algorithm attempts to adjust the number of instances (effective
parallelism) of that fragment by comparing the last segment's
ProcessingCost of its child and production-consumption rate between its
adjacent segments from step I. To simplify this initial implementation,
the parallelism of PlanFragment containing EmptySetNode, UnionNode, or
ScanNode will remain unchanged (follow MT_DOP).

This step is implemented at PlanFragment.traverseEffectiveParallelism().

III. Compute the EDoP of the query.

Effective parallelism of a query is the maximum upper bound of CPU core
count that can parallelly work on a query when considering the
overlapping between fragment execution and blocking operators. We
compute this in a similar post-order traversal as step II and split the
query tree into blocking fragment subtrees similar to step I. The
following is an example of a query plan from TPCDS-Q12.

  F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
  PLAN-ROOT SINK
  |
  13:MERGING-EXCHANGE [UNPARTITIONED]
  |
  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12)
  08:TOP-N [LIMIT=100]
  |
  07:ANALYTIC
  |
  06:SORT
  |
  12:AGGREGATE [FINALIZE]
  |
  11:EXCHANGE [HASH(i_class)]
  |
  F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12
  05:AGGREGATE [STREAMING]
  |
  04:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  10:EXCHANGE [BROADCAST]
  |  |
  |  F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM]
  |
  03:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  09:EXCHANGE [BROADCAST]
  |  |
  |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  01:SCAN HDFS [tpcds10_parquet.item, RANDOM]
  |
  00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM]

A blocking fragment is a fragment that has a blocking PlanNode or
blocking DataSink in it. The costing algorithm splits the query plan
tree into blocking subtrees divided by blocking fragment boundary. 

[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-08 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/19593 )

Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..


Patch Set 1: Code-Review+2

Thanks everyone!
Carry two +1.


--
To view, visit http://gerrit.cloudera.org:8080/19593
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Gerrit-Change-Number: 19593
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Qifan Chen 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Reviewer: Wenzhe Zhou 
Gerrit-Comment-Date: Wed, 08 Mar 2023 15:31:38 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-08 Thread Kurt Deschler (Code Review)
Kurt Deschler has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/19593 )

Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..


Patch Set 1: Code-Review+1


--
To view, visit http://gerrit.cloudera.org:8080/19593
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Gerrit-Change-Number: 19593
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Qifan Chen 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Reviewer: Wenzhe Zhou 
Gerrit-Comment-Date: Wed, 08 Mar 2023 13:53:13 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-07 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/19593 )

Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..


Patch Set 1: Verified+1


--
To view, visit http://gerrit.cloudera.org:8080/19593
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Gerrit-Change-Number: 19593
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Qifan Chen 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Reviewer: Wenzhe Zhou 
Gerrit-Comment-Date: Tue, 07 Mar 2023 08:36:37 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-06 Thread Wenzhe Zhou (Code Review)
Wenzhe Zhou has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/19593 )

Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..


Patch Set 1: Code-Review+1


--
To view, visit http://gerrit.cloudera.org:8080/19593
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Gerrit-Change-Number: 19593
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Qifan Chen 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Reviewer: Wenzhe Zhou 
Gerrit-Comment-Date: Tue, 07 Mar 2023 05:43:15 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-06 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/19593 )

Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..


Patch Set 1:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/12563/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/19593
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Gerrit-Change-Number: 19593
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Qifan Chen 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Reviewer: Wenzhe Zhou 
Gerrit-Comment-Date: Tue, 07 Mar 2023 03:47:57 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-06 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/19593 )

Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..


Patch Set 1:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/9116/ 
DRY_RUN=true


--
To view, visit http://gerrit.cloudera.org:8080/19593
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Gerrit-Change-Number: 19593
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Qifan Chen 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Reviewer: Wenzhe Zhou 
Gerrit-Comment-Date: Tue, 07 Mar 2023 03:32:10 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-11604 (part 2): Compute Effective Parallelism of Query

2023-03-06 Thread Riza Suminto (Code Review)
Riza Suminto has uploaded this change for review. ( 
http://gerrit.cloudera.org:8080/19593


Change subject: IMPALA-11604 (part 2): Compute Effective Parallelism of Query
..

IMPALA-11604 (part 2): Compute Effective Parallelism of Query

Part 1 of IMPALA-11604 implements the ProcessingCost model for each
PlanNode and DataSink. This second part builds on top of ProcessingCost
model by adjusting the number of instances for each fragment after
considering their production-consumption ratio, and then finally returns
a number representing an ideal CPU core count required for a query to
run efficiently. A more detailed explanation of the CPU costing
algorithm can be found in the three steps below.

I. Compute the total ProcessingCost of a fragment.

The costing algorithm splits a query fragment into several segments
divided by blocking PlanNode/DataSink boundary. Each fragment segment is
a subtree of PlanNodes/DataSink in the fragment with a DataSink or
blocking PlanNode as root and non-blocking leaves. All other nodes in
the segment are non-blocking. PlanNodes or DataSink that belong to the
same segment will have their ProcessingCost summed. A new CostingSegment
class is added to represent this segment.

A fragment that has a blocking PlanNode or blocking DataSink is called a
blocking fragment. Currently, only JoinBuildSink is considered as
blocking DataSink. A fragment without any blocking nodes is called a
non-blocking fragment. Step III discuss further about blocking and
non-blocking fragment.

Take an example of the following fragment plant, which is blocking since
it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N.

  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12)
  fragment-costs=[34974657, 2159270, 23752870, 22]
  08:TOP-N [LIMIT=100]
  |  cost=900
  |
  07:ANALYTIC
  |  cost=23751970
  |
  06:SORT
  |  cost=2159270
  |
  12:AGGREGATE [FINALIZE]
  |  cost=34548320
  |
  11:EXCHANGE [HASH(i_class)]
 cost=426337

In bottom-up direction, there exist four segments in F03:
  Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE)
  Blocking segment 2: 06:SORT
  Blocking segment 3: (07:ANALYTIC, 08:TOP-N)
  Non-blocking segment 4: DataStreamSink of F03

Therefore we have:
  PC(segment 1) = 426337+34548320
  PC(segment 2) = 2159270
  PC(segment 3) = 23751970+900
  PC(segment 4) = 22

These per-segment costs stored in a CostingSegment tree rooted at
PlanFragment.rootSegment_, and are [34974657, 2159270, 23752870, 22]
respectively after the post-order traversal.

This is implemented in PlanFragment.computeCostingSegment() and
PlanFragment.collectCostingSegmentHelper().

II. Compute the effective degree of parallelism (EDoP) of fragments.

The costing algorithm walks PlanFragments of the query plan tree in
post-order traversal. Upon visiting a PlanFragment, the costing
algorithm attempts to adjust the number of instances (effective
parallelism) of that fragment by comparing the last segment's
ProcessingCost of its child and production-consumption rate between its
adjacent segments from step I. To simplify this initial implementation,
the parallelism of PlanFragment containing EmptySetNode, UnionNode, or
ScanNode will remain unchanged (follow MT_DOP).

This step is implemented at PlanFragment.traverseEffectiveParallelism().

III. Compute the EDoP of the query.

Effective parallelism of a query is the maximum upper bound of CPU core
count that can parallelly work on a query when considering the
overlapping between fragment execution and blocking operators. We
compute this in a similar post-order traversal as step II and split the
query tree into blocking fragment subtrees similar to step I. The
following is an example of a query plan from TPCDS-Q12.

  F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
  PLAN-ROOT SINK
  |
  13:MERGING-EXCHANGE [UNPARTITIONED]
  |
  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12)
  08:TOP-N [LIMIT=100]
  |
  07:ANALYTIC
  |
  06:SORT
  |
  12:AGGREGATE [FINALIZE]
  |
  11:EXCHANGE [HASH(i_class)]
  |
  F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12
  05:AGGREGATE [STREAMING]
  |
  04:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  10:EXCHANGE [BROADCAST]
  |  |
  |  F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM]
  |
  03:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  09:EXCHANGE [BROADCAST]
  |  |
  |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  01:SCAN HDFS [tpcds10_parquet.item, RANDOM]
  |
  00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM]

A blocking fragment is a fragment that has a blocking PlanNode or
blocking DataSink in it. The costing algorithm splits the query plan
tree into blocking subtrees divided by blocking fragment boundary. Each