This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new 691604b1d IMPALA-12835: Fix event processing without hms_event_incremental_refresh_transactional_table 691604b1d is described below commit 691604b1d1f0e5f0dc95fdb4976cf826135e08fb Author: Csaba Ringhofer <csringho...@cloudera.com> AuthorDate: Thu Mar 7 18:17:20 2024 +0100 IMPALA-12835: Fix event processing without hms_event_incremental_refresh_transactional_table If hms_event_incremental_refresh_transactional_table is false, then for non-partitioned ACID tables Impala needs to rely on alter table event to detect INSERTs in Hive. This patch changes the event processor to not skip reloading files when processing the alter table event for this specific type of table (even if the changes in the table look trivial). Testing: - added a simple regression test Change-Id: I137b289f0e5f7c9c1947e2a3b30258c979f20987 Reviewed-on: http://gerrit.cloudera.org:8080/21116 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../impala/catalog/events/MetastoreEvents.java | 13 +++++++++++ tests/custom_cluster/test_events_custom_configs.py | 25 ++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index c62571912..1aa8b682a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1875,6 +1875,19 @@ public class MetastoreEvents { if (whitelistedTblProperties.isEmpty()) { return false; } + + boolean incrementalAcidRefresh = + BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); + boolean unpartitioned = afterTable.getPartitionKeysSize() == 0; + if (!incrementalAcidRefresh && unpartitioned + && AcidUtils.isTransactionalTable(afterTable.getParameters())) { + // In case of ACID tables no INSERT event is generated. If flag + // hms_event_incremental_refresh_transactional_table is false, then transaction + // related events are ignored (including COMMIT_TXN), so Impala has to rely on + // ALTER_TABLE events to detect INSERTs to unpartitioned tables (IMPALA-12835). + return false; + } + // There are lot of other alter statements which doesn't require file metadata // reload but these are the most common types for alter statements. if (isFieldSchemaChanged(beforeTable, afterTable) || diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index dafcb5854..0b943ba23 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1219,3 +1219,28 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): # finish than 100s (e.g. I saw a run of 5mins). # self.assert_catalogd_log_contains("INFO", "Not added ABORTED write id 1 since it's " # + "not opened and might already be cleaned up") + + @CustomClusterTestSuite.with_args( + catalogd_args="--hms_event_incremental_refresh_transactional_table=false") + def test_no_hms_event_incremental_refresh_transactional_table(self, unique_database): + """IMPALA-12835: Test that Impala notices inserts to acid tables when + hms_event_incremental_refresh_transactional_table is false. + """ + for partitioned in [False, True]: + tbl = "part_tbl" if partitioned else "tbl" + fq_tbl = unique_database + '.' + tbl + part_create = " partitioned by (p int)" if partitioned else "" + part_insert = " partition (p = 1)" if partitioned else "" + + self.run_stmt_in_hive( + "create transactional table {} (i int){}".format(fq_tbl, part_create)) + EventProcessorUtils.wait_for_event_processing(self) + + # Load the table in Impala before INSERT + self.client.execute("refresh " + fq_tbl) + self.run_stmt_in_hive( + "insert into {}{} values (1),(2),(3)".format(fq_tbl, part_insert)) + EventProcessorUtils.wait_for_event_processing(self) + + results = self.client.execute("select i from " + fq_tbl) + assert results.data == ["1", "2", "3"]