Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/19430 )
Change subject: IMPALA-3120: Support Bucket Shuffle Join for bucketed table ...................................................................... Patch Set 9: (14 comments) Thanks for working on this optimization, looks great! I haven't processed the whole patch yet, I have mainly high level comments. http://gerrit.cloudera.org:8080/#/c/19430/9//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/19430/9//COMMIT_MSG@9 PS9, Line 9: and have operations Besides the bucket operations do we also apply predicates to buckets? For example if a table is bucketed by id, id=2 can be filter down to only one bucket (or id in (2,3) can filter to 2 buckets). http://gerrit.cloudera.org:8080/#/c/19430/9//COMMIT_MSG@10 PS9, Line 10: such as join, group by, sort by, etc. Can you add more info about these optimizations? - for sort, bucketing is only applied in case it is in a partitioned analytic function, right? - is bucketing supported in case there are multiple bucketing columns? - is bucketing supported if there are multiple keys columns in a join / group, and only a subset of it is bucketed? http://gerrit.cloudera.org:8080/#/c/19430/9//COMMIT_MSG@11 PS9, Line 11: optimize Can you add some info about the tradeoffs? My understanding is that while bucketing reduces data transfer, it can have two side-effects: - can decrease parallelism as each bucket will be processed by a single node + thread - can lead to remote reads as split->node assignment happens based on bucket instead of locality http://gerrit.cloudera.org:8080/#/c/19430/9//COMMIT_MSG@13 PS9, Line 13: Can you add some info about the effect on scheduling? My understanding is the we try to schedule each bucket to a separate executor - is there some sort of affinity for this (e.g. assign to executor with most local blocks on HDFS), or it is done randomly? http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java File fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java: http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java@324 PS9, Line 324: * TODO: take bucketing into account to produce a naturally hash-partitioned : * fragment Todo can be removed http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java@510 PS9, Line 510: broadcast join or a repartitioning join Can you mention buckating join? http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java@521 PS9, Line 521: and the children are placed in the same fragment to prevent shuffle. Is this always the optimal solution? My understanding is that bucketing can lead to remote reads on HDFS, so if one of the scan nodes are very selective then shuffling after evaluating the predicate leads to less network traffic than remote reads of the whole files. http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java@984 PS9, Line 984: // If the node is a bucketed node, put the node and its children in the same fragment : // to prevent shuffle : if (node.isBucketedNode()) { : childFragment.addPlanRoot(node); : return childFragment; : } I think that this should be handled in createMergeAggregationFragment - isn't this a similar optimizaton to childHasCompatPartition at line 1024? http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java File fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java: http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java@2539 PS9, Line 2539: kudu hash Isn't it Hive hash? http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/PlanNode.java File fe/src/main/java/org/apache/impala/planner/PlanNode.java: http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/PlanNode.java@1203 PS9, Line 1203: int numExecutors = ExecutorMembershipSnapshot.getCluster().numExecutors(); I think that this can lead to incorrectly disabling bucketing, as numExecutors can be much more than the number of executors the this node will use in the end (without bucketing). E.g. if there are 100 nodes in the cluster and a table with 4 buckets with 1 file in each bucket, the maximum parallelism will be reduced to 4, while bucketing can still potentially make things faster. http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/SortNode.java File fe/src/main/java/org/apache/impala/planner/SortNode.java: http://gerrit.cloudera.org:8080/#/c/19430/9/fe/src/main/java/org/apache/impala/planner/SortNode.java@387 PS9, Line 387: // 如果Plan Node 需要分桶执行,则填充该参数,供后端使用 Can you translate this to English? http://gerrit.cloudera.org:8080/#/c/19430/9/testdata/datasets/functional/functional_schema_template.sql File testdata/datasets/functional/functional_schema_template.sql: http://gerrit.cloudera.org:8080/#/c/19430/9/testdata/datasets/functional/functional_schema_template.sql@3913 PS9, Line 3913: CLUSTERED BY(id) Can you also add a test table that is bucketed by more than 1 column? http://gerrit.cloudera.org:8080/#/c/19430/9/testdata/workloads/functional-planner/queries/PlannerTest/bucket-shuffle.test File testdata/workloads/functional-planner/queries/PlannerTest/bucket-shuffle.test: http://gerrit.cloudera.org:8080/#/c/19430/9/testdata/workloads/functional-planner/queries/PlannerTest/bucket-shuffle.test@9 PS9, Line 9: 05:EXCHANGE [UNPARTITIONED] : | : 03:AGGREGATE [FINALIZE] : | output: count(b.id), count(b.string_col) : | row-size=16B cardinality=1 I don't understand this part of the plan - shouldn't be there a pre-aggregation first in the same fragment as the join, and then a final aggregation in the same fragment as the plan root sink? http://gerrit.cloudera.org:8080/#/c/19430/9/testdata/workloads/functional-planner/queries/PlannerTest/bucket-shuffle.test@28 PS9, Line 28: HDFS partitions=1/1 files=4 size=48.82KB For bucketed tables it could be useful to add something like buckets=4/4 -- To view, visit http://gerrit.cloudera.org:8080/19430 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If321e7987bc88374d79500cffb77ea25b2ed0316 Gerrit-Change-Number: 19430 Gerrit-PatchSet: 9 Gerrit-Owner: Baike Xia <[email protected]> Gerrit-Reviewer: Csaba Ringhofer <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Quanlong Huang <[email protected]> Gerrit-Comment-Date: Mon, 30 Jan 2023 18:01:03 +0000 Gerrit-HasComments: Yes
