This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bbe3303ded1e7a796e6bc41648b89074c52b1e7e
Author: Michael Smith <michael.sm...@cloudera.com>
AuthorDate: Tue Apr 16 14:51:26 2024 -0700

    IMPALA-13003: Handle Iceberg AlreadyExistsException
    
    When multiple coordinators attempt to create the same table concurrently
    with "if not exists", we still see
    
      AlreadyExistsException: Table was created concurrently: my_iceberg_tbl
    
    Iceberg throws its own version of AlreadyExistsException, but we avoid
    most code paths that would throw it because we first check HMS to see if
    the table exists before trying to create it.
    
    Updates createIcebergTable to handle Iceberg's AlreadyExistsException
    identically to the HMS AlreadyExistsException.
    
    Adds a test using DebugAction to simulate concurrent table creation.
    
    Change-Id: I847eea9297c9ee0d8e821fe1c87ea03d22f1d96e
    Reviewed-on: http://gerrit.cloudera.org:8080/21312
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 31 +++++++++++++---------
 .../java/org/apache/impala/util/DebugUtils.java    |  7 +++++
 tests/query_test/test_iceberg.py                   | 14 ++++++++++
 3 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 8fcb54d1b..85722ffe7 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -445,6 +445,7 @@ public class CatalogOpExecutor {
     TDdlType ddlType = ddlRequest.ddl_type;
     try {
       boolean syncDdl = ddlRequest.getQuery_options().isSync_ddl();
+      String debugAction = ddlRequest.getQuery_options().getDebug_action();
       switch (ddlType) {
         case ALTER_DATABASE:
           TAlterDbParams alter_db_params = ddlRequest.getAlter_db_params();
@@ -456,8 +457,8 @@ public class CatalogOpExecutor {
           TAlterTableParams alter_table_params = 
ddlRequest.getAlter_table_params();
           tTableName = Optional.of(alter_table_params.getTable_name());
           catalogOpTracker_.increment(ddlRequest, tTableName);
-          alterTable(alter_table_params, 
ddlRequest.getQuery_options().getDebug_action(),
-              wantMinimalResult, response, catalogTimeline);
+          alterTable(alter_table_params, debugAction, wantMinimalResult, 
response,
+              catalogTimeline);
           break;
         case ALTER_VIEW:
           TCreateOrAlterViewParams alter_view_params = 
ddlRequest.getAlter_view_params();
@@ -478,14 +479,14 @@ public class CatalogOpExecutor {
           tTableName = 
Optional.of(create_table_as_select_params.getTable_name());
           catalogOpTracker_.increment(ddlRequest, tTableName);
           
response.setNew_table_created(createTable(create_table_as_select_params,
-              response, catalogTimeline, syncDdl, wantMinimalResult));
+              response, catalogTimeline, syncDdl, wantMinimalResult, 
debugAction));
           break;
         case CREATE_TABLE:
           TCreateTableParams create_table_params = 
ddlRequest.getCreate_table_params();
           tTableName = Optional.of((create_table_params.getTable_name()));
           catalogOpTracker_.increment(ddlRequest, tTableName);
           createTable(ddlRequest.getCreate_table_params(), response, 
catalogTimeline,
-              syncDdl, wantMinimalResult);
+              syncDdl, wantMinimalResult, debugAction);
           break;
         case CREATE_TABLE_LIKE:
           TCreateTableLikeParams create_table_like_params =
@@ -493,7 +494,7 @@ public class CatalogOpExecutor {
           tTableName = Optional.of(create_table_like_params.getTable_name());
           catalogOpTracker_.increment(ddlRequest, tTableName);
           createTableLike(create_table_like_params, response, catalogTimeline, 
syncDdl,
-              wantMinimalResult);
+              wantMinimalResult, debugAction);
           break;
         case CREATE_VIEW:
           TCreateOrAlterViewParams create_view_params =
@@ -3511,8 +3512,8 @@ public class CatalogOpExecutor {
    * otherwise.
    */
   private boolean createTable(TCreateTableParams params, TDdlExecResponse 
response,
-      EventSequence catalogTimeline, boolean syncDdl, boolean 
wantMinimalResult)
-      throws ImpalaException {
+      EventSequence catalogTimeline, boolean syncDdl, boolean 
wantMinimalResult,
+      @Nullable String debugAction) throws ImpalaException {
     Preconditions.checkNotNull(params);
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && 
tableName.isFullyQualified());
@@ -3561,7 +3562,7 @@ public class CatalogOpExecutor {
       return createIcebergTable(tbl, wantMinimalResult, response, 
catalogTimeline,
           params.if_not_exists, params.getColumns(), 
params.getPartition_spec(),
           params.getPrimary_key_column_names(), params.getTable_properties(),
-          params.getComment());
+          params.getComment(), debugAction);
     }
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
@@ -3936,7 +3937,7 @@ public class CatalogOpExecutor {
       boolean wantMinimalResult, TDdlExecResponse response, EventSequence 
catalogTimeline,
       boolean ifNotExists, List<TColumn> columns, TIcebergPartitionSpec 
partitionSpec,
       List<String> primaryKeyColumnNames, Map<String, String> tableProperties,
-      String tblComment) throws ImpalaException {
+      String tblComment, @Nullable String debugAction) throws ImpalaException {
     Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
 
     acquireMetastoreDdlLock(catalogTimeline);
@@ -3969,6 +3970,9 @@ public class CatalogOpExecutor {
                     newTable);
               }
             }
+            if (debugAction != null) {
+              DebugUtils.executeDebugAction(debugAction, 
DebugUtils.ICEBERG_CREATE);
+            }
             String tableLoc = IcebergCatalogOpExecutor.createTable(catalog,
                 IcebergUtil.getIcebergTableIdentifier(newTable), location, 
columns,
                 partitionSpec, primaryKeyColumnNames, newTable.getOwner(),
@@ -4042,7 +4046,8 @@ public class CatalogOpExecutor {
           newTbl.getFullName(), createEventId);
       addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result);
     } catch (Exception e) {
-      if (e instanceof AlreadyExistsException && ifNotExists) {
+      if (ifNotExists && (e instanceof AlreadyExistsException ||
+          e instanceof org.apache.iceberg.exceptions.AlreadyExistsException)) {
         addSummary(response, "Table already exists.");
         return false;
       }
@@ -4064,8 +4069,8 @@ public class CatalogOpExecutor {
    * @param  syncDdl tells is SYNC_DDL is enabled for this DDL request.
    */
   private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse 
response,
-      EventSequence catalogTimeline, boolean syncDdl, boolean 
wantMinimalResult)
-      throws ImpalaException {
+      EventSequence catalogTimeline, boolean syncDdl, boolean 
wantMinimalResult,
+      @Nullable String debugAction) throws ImpalaException {
     Preconditions.checkNotNull(params);
     THdfsFileFormat fileFormat =
         params.isSetFile_format() ? params.getFile_format() : null;
@@ -4201,7 +4206,7 @@ public class CatalogOpExecutor {
       createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline,
           params.if_not_exists, columns, partitionSpec,
           
Lists.newArrayList(srcIceTable.getIcebergSchema().identifierFieldNames()),
-          tableProperties, params.getComment());
+          tableProperties, params.getComment(), debugAction);
     } else if (srcTable instanceof KuduTable && KuduTable.isKuduTable(tbl)) {
       TCreateTableParams createTableParams =
           extractKuduCreateTableParams(params, tblName, (KuduTable) srcTable, 
tbl);
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 9bb77f54b..3d9adb968 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -59,6 +59,9 @@ public class DebugUtils {
   // debug action label for Iceberg transaction commit.
   public static final String ICEBERG_COMMIT = "catalogd_iceberg_commit";
 
+  // debug action label for Iceberg create table.
+  public static final String ICEBERG_CREATE = "catalogd_iceberg_create";
+
   // debug action label for throwing an exception during 
loadFileMetadataForPartitions.
   public static final String LOAD_FILE_METADATA_THROW_EXCEPTION =
       "catalogd_load_file_metadata_throw_exception";
@@ -173,6 +176,10 @@ public class DebugUtils {
             case "commitfailedexception":
               exceptionToThrow = new CommitFailedException(param);
               break;
+            case "icebergalreadyexistsexception":
+              exceptionToThrow = new org.apache.iceberg.exceptions.
+                  AlreadyExistsException("Table already exists");
+              break;
             default:
               LOG.error("Debug action exception class {} is not implemented",
                   exceptionClazz);
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 6e4acd0c8..6c2d646f9 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1342,6 +1342,20 @@ class TestIcebergTable(IcebergTestSuite):
       self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables',
                          vector, unique_database)
 
+  def test_table_exists(self, unique_database):
+    """Test that iceberg AlreadyExistsException are correctly handled."""
+    tbl_name = unique_database + ".create_iceberg_exists"
+    # Attempt to create an iceberg table, simulating AlreadyExistsException
+    iceberg_created_options = {'debug_action': 
'CATALOGD_ICEBERG_CREATE:EXCEPTION@'
+        'IcebergAlreadyExistsException@Table was created concurrently'}
+    err = self.execute_query_expect_failure(self.client,
+        "create table {0} (i int) stored as iceberg".format(tbl_name),
+        query_options=iceberg_created_options)
+    assert "AlreadyExistsException: Table already exists" in str(err)
+    self.execute_query_expect_success(self.client,
+        "create table if not exists {0} (i int) stored as 
iceberg".format(tbl_name),
+        query_options=iceberg_created_options)
+
   def test_abort_transaction(self, unique_database):
     """Test that iceberg operations fail correctly when an Iceberg transaction 
commit
     fails, and that the effects of the failed operation are not visible."""

Reply via email to