Hello Tamas Mate, Gabor Kaszab, Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/20078

to look at the new patch set (#4).

Change subject: IMPALA-11877: (part 2) Add support for DELETE statements for 
PARTITIONED Iceberg tables
......................................................................

IMPALA-11877: (part 2) Add support for DELETE statements for PARTITIONED 
Iceberg tables

This patch adds support for the DELETE operation for partitioned
Iceberg tables. It does so by writing position delete files
(merge-on-read strategy). The delete files contain the file path
and file position of the deleted records. The delete files must
reside in the same partition as the data files they are referring
to.

To execute the DELETE statement for a given table 'tbl', we are
basically doing an INSERT to the virtual DELETE table
'tbl-POSITION-DELETE':

from:
 DELETE FROM ice_t WHERE id = 42;

to:
 INSERT INTO ice_t-POSITION-DELETE
 SELECT INPUT__FILE__NAME, FILE__POSITION
 FROM ice_t
 WHERE id = 42;

The above was true for unpartitioned Iceberg tables.

If the table is partitioned, we need to shuffle the rows around
executors based on the partitions they belong, then sort the rows
based on the partitions (also based on 'file_path' and 'pos'), so
writers can work on partitions sequentially.

To do this, we need to select the partition columns as well from the
table. But in case of partition-evolution there are different sets
of partition columns in each partition spec of the table. To overcome
this, this patchset introduces two additional virtual columns:

* PARTITION__SPEC__ID
* ICEBERG__PARTITION__SERIALIZED

PARTITION__SPEC__ID is an INT column that contains the Iceberg spec_id
for each row. ICEBERG__PARTITION__SERIALIZED is a BINARY column that
contains all partition values base64-encoded and dot-separated. E.g.:

select PARTITION__SPEC__ID, ICEBERG__PARTITION__SERIALIZED, * FROM ice_t
+---------------------+--------------------------------+---+---+
| partition__spec__id | iceberg__partition__serialized | i | j |
+---------------------+--------------------------------+---+---+
| 0                   | Mg==                           | 2 | 2 |
| 0                   | Mg==                           | 2 | 2 |
+---------------------+--------------------------------+---+---+

So for the INSERT we are shuffling the rows between executors based on
HASH(partition__spec__id, iceberg__partition__serialized) then each
writer fragment sorts the rows based on (partition__spec__id,
iceberg__partition__serialized, file_path, pos) before writing them out
to delete files. The IcebergDeleteSink has been smarten up in a way that
it creates a new delete file whenever it sees a row with a new
(partition__spec__id, iceberg__partition__serialized).

Some refactorings were also involved during implementing this patch set.
A lot of common code between IcebergDeleteSink and HdfsTableSink has
been moved to the common base class TableSinkBase. In the Frontend this
patch set also moves some common code of InsertStmt and ModifyStmt to a
new common base class DmlStatementBase.

Testing:
  * planner tests
  * e2e tests (including interop with Hive)
  * Did manual stress test with a TPCDS_3000.store_sales
  ** Table had 8 Billion rows, partitioned by column (ss_sold_date_sk)
  ** Deleted 800 Million rows using 10 Impala hosts
  ** Operation was successful, finished under a minute
  ** Created minimum number of delete files, i.e. one per partition

Change-Id: I28b06f240c23c336a7c5b6ef22fe2ee0a21f7b60
---
M be/src/exec/file-metadata-utils.cc
M be/src/exec/file-metadata-utils.h
M be/src/exec/hdfs-scan-node-base.cc
M be/src/exec/hdfs-table-sink.cc
M be/src/exec/hdfs-table-sink.h
M be/src/exec/hdfs-text-table-writer.cc
M be/src/exec/hdfs-text-table-writer.h
M be/src/exec/iceberg-delete-sink.cc
M be/src/exec/iceberg-delete-sink.h
M be/src/exec/output-partition.h
M be/src/exec/table-sink-base.cc
M be/src/exec/table-sink-base.h
M be/src/runtime/descriptors.cc
M be/src/runtime/descriptors.h
M be/src/runtime/dml-exec-state.cc
M common/fbs/IcebergObjects.fbs
M common/thrift/CatalogObjects.thrift
M common/thrift/DataSinks.thrift
M fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
A fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
M fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
M fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
M fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
M fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
M fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
M fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
M fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/TableSink.java
M fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
M fe/src/main/java/org/apache/impala/util/IcebergUtil.java
M 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
A 
testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
M testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
M tests/query_test/test_iceberg.py
37 files changed, 1,522 insertions(+), 391 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/78/20078/4
--
To view, visit http://gerrit.cloudera.org:8080/20078
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I28b06f240c23c336a7c5b6ef22fe2ee0a21f7b60
Gerrit-Change-Number: 20078
Gerrit-PatchSet: 4
Gerrit-Owner: Zoltan Borok-Nagy <[email protected]>
Gerrit-Reviewer: Gabor Kaszab <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Tamas Mate <[email protected]>
Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]>

Reply via email to