This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 691604b1d IMPALA-12835: Fix event processing without 
hms_event_incremental_refresh_transactional_table
691604b1d is described below

commit 691604b1d1f0e5f0dc95fdb4976cf826135e08fb
Author: Csaba Ringhofer <csringho...@cloudera.com>
AuthorDate: Thu Mar 7 18:17:20 2024 +0100

    IMPALA-12835: Fix event processing without 
hms_event_incremental_refresh_transactional_table
    
    If hms_event_incremental_refresh_transactional_table is false, then
    for non-partitioned ACID tables Impala needs to rely on alter table
    event to detect INSERTs in Hive. This patch changes the event processor
    to not skip reloading files when processing the alter table event
    for this specific type of table (even if the changes in the table
    look trivial).
    
    Testing:
    - added a simple regression test
    
    Change-Id: I137b289f0e5f7c9c1947e2a3b30258c979f20987
    Reviewed-on: http://gerrit.cloudera.org:8080/21116
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 13 +++++++++++
 tests/custom_cluster/test_events_custom_configs.py | 25 ++++++++++++++++++++++
 2 files changed, 38 insertions(+)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index c62571912..1aa8b682a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1875,6 +1875,19 @@ public class MetastoreEvents {
       if (whitelistedTblProperties.isEmpty()) {
         return false;
       }
+
+      boolean incrementalAcidRefresh =
+          
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
+      boolean unpartitioned = afterTable.getPartitionKeysSize() == 0;
+      if (!incrementalAcidRefresh && unpartitioned
+          && AcidUtils.isTransactionalTable(afterTable.getParameters())) {
+        // In case of ACID tables no INSERT event is generated. If flag
+        // hms_event_incremental_refresh_transactional_table is false, then 
transaction
+        // related events are ignored (including COMMIT_TXN), so Impala has to 
rely on
+        // ALTER_TABLE events to detect INSERTs to unpartitioned tables 
(IMPALA-12835).
+        return false;
+      }
+
       // There are lot of other alter statements which doesn't require file 
metadata
       // reload but these are the most common types for alter statements.
       if (isFieldSchemaChanged(beforeTable, afterTable) ||
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index dafcb5854..0b943ba23 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1219,3 +1219,28 @@ class 
TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     # finish than 100s (e.g. I saw a run of 5mins).
     # self.assert_catalogd_log_contains("INFO", "Not added ABORTED write id 1 
since it's "
     #    + "not opened and might already be cleaned up")
+
+  @CustomClusterTestSuite.with_args(
+      
catalogd_args="--hms_event_incremental_refresh_transactional_table=false")
+  def test_no_hms_event_incremental_refresh_transactional_table(self, 
unique_database):
+    """IMPALA-12835: Test that Impala notices inserts to acid tables when
+       hms_event_incremental_refresh_transactional_table is false.
+    """
+    for partitioned in [False, True]:
+      tbl = "part_tbl" if partitioned else "tbl"
+      fq_tbl = unique_database + '.' + tbl
+      part_create = " partitioned by (p int)" if partitioned else ""
+      part_insert = " partition (p = 1)" if partitioned else ""
+
+      self.run_stmt_in_hive(
+          "create transactional table {} (i int){}".format(fq_tbl, 
part_create))
+      EventProcessorUtils.wait_for_event_processing(self)
+
+      # Load the table in Impala before INSERT
+      self.client.execute("refresh " + fq_tbl)
+      self.run_stmt_in_hive(
+          "insert into {}{} values (1),(2),(3)".format(fq_tbl, part_insert))
+      EventProcessorUtils.wait_for_event_processing(self)
+
+      results = self.client.execute("select i from " + fq_tbl)
+      assert results.data == ["1", "2", "3"]

Reply via email to