Hello Impala Public Jenkins, I'd like you to reexamine a change. Please visit
http://gerrit.cloudera.org:8080/20548 to look at the new patch set (#2). Change subject: IMPALA-12308: DIRECTED distribution mode for V2 Iceberg tables ...................................................................... IMPALA-12308: DIRECTED distribution mode for V2 Iceberg tables For Iceberg tables, when joining the data files with the delete files, both of the current distribution modes (broadcast, partitioned) are wasteful. The idea is that when we read a row from a delete file it contains the name of the data file that this particular delete row is referring to so if we knew where that data file is scheduled we could directly send that delete file row there. This patch enhances the scheduler to collect the information about which data file is scheduled on which host. Since, the scan node for the data files are on the same host as the Iceberg join node, we can send the delete files directly to that specific host. Functional testing: - Re-run full test suite to check for regressions. Performance testing: 1) Local machine: SELECT COUNT(1) FROM TPCH10_parquet.lineitem Around 15% of the rows are deleted. As the table is unpartitioned I got a small number of delete files with relatively large size. Query runtime decreased by ~80% 2) Local machine: SELECT COUNT(1) FROM TPCDS10_store_sales Around 15% of the rows are deleted. Table is partitioned that results more delete files but smaller in size. Query runtime decreased by ~50% 3) MOW-DEV cdw-master + this patch: SELECT COUNT(1) FROM a scaled store_sales table having ~8.6B rows and ~15% are deleted. Here we had 2 scenarios: a) Table is written by Impala: The runtime decreased by ~80%. One delete file row is sent exactly to one host. b) Table is written by Hive: The runtime decreased by ~60%. Here apparently the data files are bigger and one data file might be spread to multiple scan ranges. As a result one delete file row might be sent to multiple hosts. The time difference between the a) run is the time spent on sending out more delete file rows. Change-Id: I212afd7c9e94551a1c50a40ccb0e3c1f7ecdf3d2 --- M be/src/exec/data-sink.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/fragment-state.cc M be/src/runtime/fragment-state.h M be/src/runtime/krpc-data-stream-sender.cc M be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M common/protobuf/admission_control_service.proto M common/protobuf/control_service.proto M common/thrift/Partitions.thrift M common/thrift/PlanNodes.thrift M common/thrift/Query.thrift M fe/src/main/java/org/apache/impala/planner/DataPartition.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java M fe/src/main/java/org/apache/impala/planner/ExchangeNode.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java M fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java M fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java M fe/src/main/java/org/apache/impala/planner/JoinNode.java M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test 25 files changed, 490 insertions(+), 276 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/48/20548/2 -- To view, visit http://gerrit.cloudera.org:8080/20548 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I212afd7c9e94551a1c50a40ccb0e3c1f7ecdf3d2 Gerrit-Change-Number: 20548 Gerrit-PatchSet: 2 Gerrit-Owner: Gabor Kaszab <gaborkas...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>