Zoltan Borok-Nagy has uploaded this change for review. ( http://gerrit.cloudera.org:8080/20078
Change subject: IMPALA-11877: (part 2) Add support for DELETE statements for PARTITIONED Iceberg tables ...................................................................... IMPALA-11877: (part 2) Add support for DELETE statements for PARTITIONED Iceberg tables This patch adds support for the DELETE operation for partitioned Iceberg tables. It does so by writing position delete files (merge-on-read strategy). The delete files contain the file path and file position of the deleted records. The delete files must reside in the same partition as the data files they are referring to. To execute the DELETE statement for a given table 'tbl', we are basically doing an INSERT to the virtual DELETE table 'tbl-POSITION-DELETE': from: DELETE FROM ice_t WHERE id = 42; to: INSERT INTO ice_t-POSITION-DELETE SELECT INPUT__FILE__NAME, FILE__POSITION FROM ice_t WHERE id = 42; The above was true for unpartitioned Iceberg tables. If the table is partitioned, we need to shuffle the rows around executors based on the partitions they belong, then sort the rows based on the partitions (also based on 'file_path' and 'pos'), so writers can work on partitions sequentially. To do this, we need to select the partition columns as well from the table. But in case of partition-evolution there are different sets of partition columns in each partition spec of the table. To overcome this, this patchset introduces two additional virtual columns: * PARTITION__SPEC__ID * ICEBERG__PARTITION__SERIALIZED PARTITION__SPEC__ID is an INT column that contains the Iceberg spec_id for each row. ICEBERG__PARTITION__SERIALIZED is a BINARY column that contains all partition values base64-encoded and dot-separated. E.g.: select PARTITION__SPEC__ID, ICEBERG__PARTITION__SERIALIZED, * FROM ice_t +---------------------+--------------------------------+---+---+ | partition__spec__id | iceberg__partition__serialized | i | j | +---------------------+--------------------------------+---+---+ | 0 | Mg== | 2 | 2 | | 0 | Mg== | 2 | 2 | +---------------------+--------------------------------+---+---+ So for the INSERT we are shuffling the rows between executors based on HASH(partition__spec__id, iceberg__partition__serialized) then each writer fragment sorts the rows based on (partition__spec__id, iceberg__partition__serialized, file_path, pos) before writing them out to delete files. The IcebergDeleteSink has been smarten up in way that it creates a new delete file whenever it sees a row with a new (partition__spec__id, iceberg__partition__serialized). Some refactorings were also involved during implementing this patch set. A lot of common code between IcebergDeleteSink and HdfsTableSink has been moved to the common base class TableSinkBase. In the Frontend this patch set also moves some common code of InsertStmt and ModifyStmt to a new common base class DmlStatementBase. Testing: * planner tests * e2e tests (including interop with Hive) * Did manual stress test with a TPCDS_3000.store_sales ** Table had 8 Billion rows, partitioned by column (ss_sold_date_sk) ** Deleted 800 Million rows using 10 Impala hosts ** Operation was successful, finished under a minute ** Created minimum number of delete files, i.e. one per partition Change-Id: I28b06f240c23c336a7c5b6ef22fe2ee0a21f7b60 --- M be/src/exec/file-metadata-utils.cc M be/src/exec/file-metadata-utils.h M be/src/exec/hdfs-scan-node-base.cc M be/src/exec/hdfs-table-sink.cc M be/src/exec/hdfs-table-sink.h M be/src/exec/hdfs-text-table-writer.cc M be/src/exec/hdfs-text-table-writer.h M be/src/exec/iceberg-delete-sink.cc M be/src/exec/iceberg-delete-sink.h M be/src/exec/output-partition.h M be/src/exec/table-sink-base.cc M be/src/exec/table-sink-base.h M be/src/runtime/descriptors.cc M be/src/runtime/descriptors.h M be/src/runtime/dml-exec-state.cc M common/fbs/IcebergObjects.fbs M common/thrift/CatalogObjects.thrift M common/thrift/DataSinks.thrift M fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java A fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java M fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java M fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java M fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java M fe/src/main/java/org/apache/impala/catalog/IcebergTable.java M fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java M fe/src/main/java/org/apache/impala/planner/Planner.java M fe/src/main/java/org/apache/impala/planner/TableSink.java M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java M fe/src/main/java/org/apache/impala/util/IcebergUtil.java M testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test A testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test M tests/query_test/test_iceberg.py 37 files changed, 1,513 insertions(+), 423 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/78/20078/1 -- To view, visit http://gerrit.cloudera.org:8080/20078 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I28b06f240c23c336a7c5b6ef22fe2ee0a21f7b60 Gerrit-Change-Number: 20078 Gerrit-PatchSet: 1 Gerrit-Owner: Zoltan Borok-Nagy <[email protected]>
