This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit bbe3303ded1e7a796e6bc41648b89074c52b1e7e Author: Michael Smith <michael.sm...@cloudera.com> AuthorDate: Tue Apr 16 14:51:26 2024 -0700 IMPALA-13003: Handle Iceberg AlreadyExistsException When multiple coordinators attempt to create the same table concurrently with "if not exists", we still see AlreadyExistsException: Table was created concurrently: my_iceberg_tbl Iceberg throws its own version of AlreadyExistsException, but we avoid most code paths that would throw it because we first check HMS to see if the table exists before trying to create it. Updates createIcebergTable to handle Iceberg's AlreadyExistsException identically to the HMS AlreadyExistsException. Adds a test using DebugAction to simulate concurrent table creation. Change-Id: I847eea9297c9ee0d8e821fe1c87ea03d22f1d96e Reviewed-on: http://gerrit.cloudera.org:8080/21312 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../apache/impala/service/CatalogOpExecutor.java | 31 +++++++++++++--------- .../java/org/apache/impala/util/DebugUtils.java | 7 +++++ tests/query_test/test_iceberg.py | 14 ++++++++++ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 8fcb54d1b..85722ffe7 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -445,6 +445,7 @@ public class CatalogOpExecutor { TDdlType ddlType = ddlRequest.ddl_type; try { boolean syncDdl = ddlRequest.getQuery_options().isSync_ddl(); + String debugAction = ddlRequest.getQuery_options().getDebug_action(); switch (ddlType) { case ALTER_DATABASE: TAlterDbParams alter_db_params = ddlRequest.getAlter_db_params(); @@ -456,8 +457,8 @@ public class CatalogOpExecutor { TAlterTableParams alter_table_params = ddlRequest.getAlter_table_params(); tTableName = Optional.of(alter_table_params.getTable_name()); catalogOpTracker_.increment(ddlRequest, tTableName); - alterTable(alter_table_params, ddlRequest.getQuery_options().getDebug_action(), - wantMinimalResult, response, catalogTimeline); + alterTable(alter_table_params, debugAction, wantMinimalResult, response, + catalogTimeline); break; case ALTER_VIEW: TCreateOrAlterViewParams alter_view_params = ddlRequest.getAlter_view_params(); @@ -478,14 +479,14 @@ public class CatalogOpExecutor { tTableName = Optional.of(create_table_as_select_params.getTable_name()); catalogOpTracker_.increment(ddlRequest, tTableName); response.setNew_table_created(createTable(create_table_as_select_params, - response, catalogTimeline, syncDdl, wantMinimalResult)); + response, catalogTimeline, syncDdl, wantMinimalResult, debugAction)); break; case CREATE_TABLE: TCreateTableParams create_table_params = ddlRequest.getCreate_table_params(); tTableName = Optional.of((create_table_params.getTable_name())); catalogOpTracker_.increment(ddlRequest, tTableName); createTable(ddlRequest.getCreate_table_params(), response, catalogTimeline, - syncDdl, wantMinimalResult); + syncDdl, wantMinimalResult, debugAction); break; case CREATE_TABLE_LIKE: TCreateTableLikeParams create_table_like_params = @@ -493,7 +494,7 @@ public class CatalogOpExecutor { tTableName = Optional.of(create_table_like_params.getTable_name()); catalogOpTracker_.increment(ddlRequest, tTableName); createTableLike(create_table_like_params, response, catalogTimeline, syncDdl, - wantMinimalResult); + wantMinimalResult, debugAction); break; case CREATE_VIEW: TCreateOrAlterViewParams create_view_params = @@ -3511,8 +3512,8 @@ public class CatalogOpExecutor { * otherwise. */ private boolean createTable(TCreateTableParams params, TDdlExecResponse response, - EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult) - throws ImpalaException { + EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult, + @Nullable String debugAction) throws ImpalaException { Preconditions.checkNotNull(params); TableName tableName = TableName.fromThrift(params.getTable_name()); Preconditions.checkState(tableName != null && tableName.isFullyQualified()); @@ -3561,7 +3562,7 @@ public class CatalogOpExecutor { return createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline, params.if_not_exists, params.getColumns(), params.getPartition_spec(), params.getPrimary_key_column_names(), params.getTable_properties(), - params.getComment()); + params.getComment(), debugAction); } Preconditions.checkState(params.getColumns().size() > 0, "Empty column list given as argument to Catalog.createTable"); @@ -3936,7 +3937,7 @@ public class CatalogOpExecutor { boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline, boolean ifNotExists, List<TColumn> columns, TIcebergPartitionSpec partitionSpec, List<String> primaryKeyColumnNames, Map<String, String> tableProperties, - String tblComment) throws ImpalaException { + String tblComment, @Nullable String debugAction) throws ImpalaException { Preconditions.checkState(IcebergTable.isIcebergTable(newTable)); acquireMetastoreDdlLock(catalogTimeline); @@ -3969,6 +3970,9 @@ public class CatalogOpExecutor { newTable); } } + if (debugAction != null) { + DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_CREATE); + } String tableLoc = IcebergCatalogOpExecutor.createTable(catalog, IcebergUtil.getIcebergTableIdentifier(newTable), location, columns, partitionSpec, primaryKeyColumnNames, newTable.getOwner(), @@ -4042,7 +4046,8 @@ public class CatalogOpExecutor { newTbl.getFullName(), createEventId); addTableToCatalogUpdate(newTbl, wantMinimalResult, response.result); } catch (Exception e) { - if (e instanceof AlreadyExistsException && ifNotExists) { + if (ifNotExists && (e instanceof AlreadyExistsException || + e instanceof org.apache.iceberg.exceptions.AlreadyExistsException)) { addSummary(response, "Table already exists."); return false; } @@ -4064,8 +4069,8 @@ public class CatalogOpExecutor { * @param syncDdl tells is SYNC_DDL is enabled for this DDL request. */ private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response, - EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult) - throws ImpalaException { + EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult, + @Nullable String debugAction) throws ImpalaException { Preconditions.checkNotNull(params); THdfsFileFormat fileFormat = params.isSetFile_format() ? params.getFile_format() : null; @@ -4201,7 +4206,7 @@ public class CatalogOpExecutor { createIcebergTable(tbl, wantMinimalResult, response, catalogTimeline, params.if_not_exists, columns, partitionSpec, Lists.newArrayList(srcIceTable.getIcebergSchema().identifierFieldNames()), - tableProperties, params.getComment()); + tableProperties, params.getComment(), debugAction); } else if (srcTable instanceof KuduTable && KuduTable.isKuduTable(tbl)) { TCreateTableParams createTableParams = extractKuduCreateTableParams(params, tblName, (KuduTable) srcTable, tbl); diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 9bb77f54b..3d9adb968 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -59,6 +59,9 @@ public class DebugUtils { // debug action label for Iceberg transaction commit. public static final String ICEBERG_COMMIT = "catalogd_iceberg_commit"; + // debug action label for Iceberg create table. + public static final String ICEBERG_CREATE = "catalogd_iceberg_create"; + // debug action label for throwing an exception during loadFileMetadataForPartitions. public static final String LOAD_FILE_METADATA_THROW_EXCEPTION = "catalogd_load_file_metadata_throw_exception"; @@ -173,6 +176,10 @@ public class DebugUtils { case "commitfailedexception": exceptionToThrow = new CommitFailedException(param); break; + case "icebergalreadyexistsexception": + exceptionToThrow = new org.apache.iceberg.exceptions. + AlreadyExistsException("Table already exists"); + break; default: LOG.error("Debug action exception class {} is not implemented", exceptionClazz); diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 6e4acd0c8..6c2d646f9 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1342,6 +1342,20 @@ class TestIcebergTable(IcebergTestSuite): self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables', vector, unique_database) + def test_table_exists(self, unique_database): + """Test that iceberg AlreadyExistsException are correctly handled.""" + tbl_name = unique_database + ".create_iceberg_exists" + # Attempt to create an iceberg table, simulating AlreadyExistsException + iceberg_created_options = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@' + 'IcebergAlreadyExistsException@Table was created concurrently'} + err = self.execute_query_expect_failure(self.client, + "create table {0} (i int) stored as iceberg".format(tbl_name), + query_options=iceberg_created_options) + assert "AlreadyExistsException: Table already exists" in str(err) + self.execute_query_expect_success(self.client, + "create table if not exists {0} (i int) stored as iceberg".format(tbl_name), + query_options=iceberg_created_options) + def test_abort_transaction(self, unique_database): """Test that iceberg operations fail correctly when an Iceberg transaction commit fails, and that the effects of the failed operation are not visible."""