Repository: kudu Updated Branches: refs/heads/master a65e58ec0 -> ca3e38759
catalog-manager: refactor AlterTable and DeleteTable methods This commit splits CatalogManager::AlterTable and CatalogManager::DeleteTable in two. One method handles RPC specifics, and the second handles applying the alter/delete operation to the Kudu catalog. The RPC-handling method thus calls into the method which modifies the catalog. A follow-up commit in the HMS integration series will add another front-end method specific to HMS notification log listener events. Change-Id: Ia384768ee7246411052ccadc66c33e83b541c195 Reviewed-on: http://gerrit.cloudera.org:8080/10378 Tested-by: Kudu Jenkins Reviewed-by: Dan Burkert <danburk...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ca3e3875 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ca3e3875 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ca3e3875 Branch: refs/heads/master Commit: ca3e387591d1947c24f9af814bb6429447902db5 Parents: a65e58e Author: Dan Burkert <danburk...@apache.org> Authored: Thu May 10 13:01:33 2018 -0700 Committer: Dan Burkert <danburk...@apache.org> Committed: Mon May 14 19:20:05 2018 +0000 ---------------------------------------------------------------------- src/kudu/integration-tests/alter_table-test.cc | 2 +- src/kudu/master/catalog_manager.cc | 75 ++++++++++++--------- src/kudu/master/catalog_manager.h | 25 ++++--- src/kudu/master/master_service.cc | 4 +- 4 files changed, 62 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/integration-tests/alter_table-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc index f771209..640f1cb 100644 --- a/src/kudu/integration-tests/alter_table-test.cc +++ b/src/kudu/integration-tests/alter_table-test.cc @@ -344,7 +344,7 @@ TEST_F(AlterTableTest, TestAddNotNullableColumnWithoutDefaults) { cluster_->mini_master()->master()->catalog_manager(); master::CatalogManager::ScopedLeaderSharedLock l(catalog); ASSERT_OK(l.first_failed_status()); - Status s = catalog->AlterTable(&req, &resp, nullptr); + Status s = catalog->AlterTableRpc(req, &resp, nullptr); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_STR_CONTAINS(s.ToString(), "column `c2`: NOT NULL columns must have a default"); } http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/catalog_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 174c7fc..6438901 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -1700,19 +1700,22 @@ Status CatalogManager::FindAndLockTable(const ReqClass& request, return Status::OK(); } -Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req, - DeleteTableResponsePB* resp, - rpc::RpcContext* rpc) { +Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req, + DeleteTableResponsePB* resp, + rpc::RpcContext* rpc) { + LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1", + RequestorString(rpc), SecureShortDebugString(req)); + return DeleteTable(req, resp); +} + +Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableResponsePB* resp) { leader_lock_.AssertAcquiredForReading(); RETURN_NOT_OK(CheckOnline()); - LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1", - RequestorString(rpc), SecureShortDebugString(*req)); - // 1. Look up the table, lock it, and mark it as removed. scoped_refptr<TableInfo> table; TableMetadataLock l; - RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::WRITE, &table, &l)); + RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l)); if (l.data().is_deleted()) { return SetupError(Status::NotFound("the table was deleted", l.data().pb.state_msg()), resp, MasterErrorPB::TABLE_NOT_FOUND); @@ -1788,7 +1791,9 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req, TRACE("Removing table from by-name map"); std::lock_guard<LockType> l_map(lock_); if (table_names_map_.erase(l.data().name()) != 1) { - PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name()); + LOG(FATAL) << "Could not remove table " << table->ToString() + << " from map in response to DeleteTable request: " + << SecureShortDebugString(req); } } @@ -2064,19 +2069,22 @@ Status CatalogManager::ApplyAlterPartitioningSteps( return Status::OK(); } -Status CatalogManager::AlterTable(const AlterTableRequestPB* req, - AlterTableResponsePB* resp, - rpc::RpcContext* rpc) { +Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req, + AlterTableResponsePB* resp, + rpc::RpcContext* rpc) { + LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1", + RequestorString(rpc), SecureShortDebugString(req)); + return AlterTable(req, resp); +} + +Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResponsePB* resp) { leader_lock_.AssertAcquiredForReading(); RETURN_NOT_OK(CheckOnline()); - LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1", - RequestorString(rpc), SecureShortDebugString(*req)); - // 1. Group the steps into schema altering steps and partition altering steps. vector<AlterTableRequestPB::Step> alter_schema_steps; vector<AlterTableRequestPB::Step> alter_partitioning_steps; - for (const auto& step : req->alter_schema_steps()) { + for (const auto& step : req.alter_schema_steps()) { switch (step.type()) { case AlterTableRequestPB::ADD_COLUMN: case AlterTableRequestPB::DROP_COLUMN: @@ -2099,7 +2107,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // 2. Lookup the table, verify if it exists, and lock it for modification. scoped_refptr<TableInfo> table; TableMetadataLock l; - RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::WRITE, &table, &l)); + RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l)); if (l.data().is_deleted()) { return SetupError( Status::NotFound("the table was deleted", l.data().pb.state_msg()), @@ -2131,43 +2139,43 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, resp, MasterErrorPB::INVALID_SCHEMA)); // 4. Validate and try to acquire the new table name. - if (req->has_new_table_name()) { + if (req.has_new_table_name()) { RETURN_NOT_OK(SetupError( - ValidateIdentifier(req->new_table_name()).CloneAndPrepend("invalid table name"), + ValidateIdentifier(req.new_table_name()).CloneAndPrepend("invalid table name"), resp, MasterErrorPB::INVALID_SCHEMA)); std::lock_guard<LockType> catalog_lock(lock_); TRACE("Acquired catalog manager lock"); // Verify that the table does not exist. - scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req->new_table_name()); + scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req.new_table_name()); if (other_table != nullptr) { return SetupError( Status::AlreadyPresent(Substitute("table $0 already exists with id $1", - req->new_table_name(), table->id())), + req.new_table_name(), table->id())), resp, MasterErrorPB::TABLE_ALREADY_PRESENT); } // Reserve the new table name if possible. - if (!InsertIfNotPresent(&reserved_table_names_, req->new_table_name())) { + if (!InsertIfNotPresent(&reserved_table_names_, req.new_table_name())) { // ServiceUnavailable will cause the client to retry the create table // request. We don't want to outright fail the request with // 'AlreadyPresent', because a table name reservation can be rolled back // in the case of an error. Instead, we force the client to retry at a // later time. return SetupError(Status::ServiceUnavailable(Substitute( - "table name $0 is already reserved", req->new_table_name())), + "table name $0 is already reserved", req.new_table_name())), resp, MasterErrorPB::TABLE_ALREADY_PRESENT); } - l.mutable_data()->pb.set_name(req->new_table_name()); + l.mutable_data()->pb.set_name(req.new_table_name()); } // Ensure that we drop our reservation upon return. auto cleanup = MakeScopedCleanup([&] () { - if (req->has_new_table_name()) { + if (req.has_new_table_name()) { std::lock_guard<LockType> l(lock_); - CHECK_EQ(1, reserved_table_names_.erase(req->new_table_name())); + CHECK_EQ(1, reserved_table_names_.erase(req.new_table_name())); } }); @@ -2177,7 +2185,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, if (!alter_partitioning_steps.empty()) { TRACE("Apply alter partitioning"); Schema client_schema; - RETURN_NOT_OK(SetupError(SchemaFromPB(req->schema(), &client_schema), + RETURN_NOT_OK(SetupError(SchemaFromPB(req.schema(), &client_schema), resp, MasterErrorPB::UNKNOWN_ERROR)); RETURN_NOT_OK(SetupError( ApplyAlterPartitioningSteps(l, table, client_schema, alter_partitioning_steps, @@ -2188,7 +2196,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // Set to true if columns are altered, added or dropped. bool has_schema_changes = !alter_schema_steps.empty(); // Set to true if there are schema changes, or the table is renamed. - bool has_metadata_changes = has_schema_changes || req->has_new_table_name(); + bool has_metadata_changes = has_schema_changes || req.has_new_table_name(); // Set to true if there are partitioning changes. bool has_partitioning_changes = !alter_partitioning_steps.empty(); // Set to true if metadata changes need to be applied to existing tablets. @@ -2227,7 +2235,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // HMS is available. We do not allow altering tables while the HMS is // unavailable, because that would cause the catalogs to become unsynchronized. if (hms_catalog_ != nullptr && has_metadata_changes) { - const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name; + const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name; Status s = hms_catalog_->AlterTable(table->id(), table_name, new_name, new_schema); if (!s.ok()) { @@ -2249,7 +2257,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, << s.ToString(); return; } - const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name; + const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name; WARN_NOT_OK(hms_catalog_->AlterTable(table->id(), new_name, table_name, schema), "An error occurred while attempting to roll-back HMS table entry alteration"); @@ -2299,12 +2307,13 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, // Take the global catalog manager lock in order to modify the global table // and tablets indices. std::lock_guard<LockType> lock(lock_); - if (req->has_new_table_name()) { + if (req.has_new_table_name()) { if (table_names_map_.erase(table_name) != 1) { - PANIC_RPC(rpc, Substitute( - "Could not remove table (name $0) from map", table_name)); + LOG(FATAL) << "Could not remove table " << table->ToString() + << " from map in response to AlterTable request: " + << SecureShortDebugString(req); } - InsertOrDie(&table_names_map_, req->new_table_name(), table); + InsertOrDie(&table_names_map_, req.new_table_name(), table); } // Insert new tablets into the global tablet map. After this, the tablets http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/catalog_manager.h ---------------------------------------------------------------------- diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h index 957c3ea..447a391 100644 --- a/src/kudu/master/catalog_manager.h +++ b/src/kudu/master/catalog_manager.h @@ -533,21 +533,21 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { Status IsCreateTableDone(const IsCreateTableDoneRequestPB* req, IsCreateTableDoneResponsePB* resp); - // Delete the specified table + // Delete the specified table in response to a DeleteTableRequest RPC. // // The RPC context is provided for logging/tracing purposes, // but this function does not itself respond to the RPC. - Status DeleteTable(const DeleteTableRequestPB* req, - DeleteTableResponsePB* resp, - rpc::RpcContext* rpc); + Status DeleteTableRpc(const DeleteTableRequestPB& req, + DeleteTableResponsePB* resp, + rpc::RpcContext* rpc); - // Alter the specified table + // Alter the specified table in response to an AlterTableRequest RPC. // // The RPC context is provided for logging/tracing purposes, // but this function does not itself respond to the RPC. - Status AlterTable(const AlterTableRequestPB* req, - AlterTableResponsePB* resp, - rpc::RpcContext* rpc); + Status AlterTableRpc(const AlterTableRequestPB& req, + AlterTableResponsePB* resp, + rpc::RpcContext* rpc); // Get the information about an in-progress alter operation // @@ -664,6 +664,15 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { typedef std::unordered_map<std::string, scoped_refptr<TableInfo>> TableInfoMap; typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap; + // Delete the specified table in the catalog. + Status DeleteTable(const DeleteTableRequestPB& req, + DeleteTableResponsePB* resp) WARN_UNUSED_RESULT; + + + // Alter the specified table in the catalog. + Status AlterTable(const AlterTableRequestPB& req, + AlterTableResponsePB* resp) WARN_UNUSED_RESULT; + // Called by SysCatalog::SysCatalogStateChanged when this node // becomes the leader of a consensus configuration. Executes // PrepareForLeadershipTask() via 'worker_pool_'. http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/master_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc index 5ccaa66..b71f470 100644 --- a/src/kudu/master/master_service.cc +++ b/src/kudu/master/master_service.cc @@ -332,7 +332,7 @@ void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req, return; } - Status s = server_->catalog_manager()->DeleteTable(req, resp, rpc); + Status s = server_->catalog_manager()->DeleteTableRpc(*req, resp, rpc); CheckRespErrorOrSetUnknown(s, resp); rpc->RespondSuccess(); } @@ -345,7 +345,7 @@ void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req, return; } - Status s = server_->catalog_manager()->AlterTable(req, resp, rpc); + Status s = server_->catalog_manager()->AlterTableRpc(*req, resp, rpc); CheckRespErrorOrSetUnknown(s, resp); rpc->RespondSuccess(); }