Repository: kudu
Updated Branches:
  refs/heads/master b5f3d1a10 -> 5f1ca32f3


http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/hms_notification_log_listener.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/hms_notification_log_listener.h 
b/src/kudu/master/hms_notification_log_listener.h
new file mode 100644
index 0000000..c4e152d
--- /dev/null
+++ b/src/kudu/master/hms_notification_log_listener.h
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
+
+namespace hive {
+class NotificationEvent;
+}
+
+namespace kudu {
+
+class MonoTime;
+class Thread;
+
+namespace master {
+
+class CatalogManager;
+
+// A CatalogManager background task which listens for events occurring in the
+// Hive Metastore, and synchronizes the Kudu catalog accordingly.
+//
+// As a background task, the lifetime of an instance of this class must be less
+// than the catalog manager it belongs to.
+//
+// The notification log listener task continuously wakes up according to its
+// configured poll period, however it performs no work when the master is a
+// follower.
+//
+// When a change to the Kudu catalog is performed in response to a notification
+// log event, the corresponding event ID is recorded in the sys catalog as the
+// latest handled event. This ensures that masters do not double-apply
+// notification events as leadership changes.
+//
+// The notification log listener listens for two types of events on Kudu 
tables:
+//
+// - ALTER TABLE RENAME
+//    Table rename is a special case of ALTER TABLE. The notification log
+//    listener listens for rename event notifications for Kudu tables, and
+//    renames the corresponding Kudu table. See below for why renames can be
+//    applied back to Kudu, but not other types of alterations.
+//
+// - DROP TABLE
+//    The notification log listener listens for drop table events for Kudu
+//    tables, and drops the corresponding Kudu table. This allows the catalogs
+//    to stay synchronized when DROP TABLE and DROP DATABASE CASCADE Hive
+//    commands are executed.
+//
+// The notification log listener can support renaming and dropping tables in a
+// safe manner because the Kudu table ID is stored in the HMS table entry. 
Using
+// the Kudu table ID, the exact table which the event applies to can always be
+// identified. For other changes made in ALTER TABLE statements, such as ALTER
+// TABLE DROP COLUMN, there is no way to identify with certainty which column
+// has been dropped, since we do not store column IDs in the HMS table entries.
+class HmsNotificationLogListenerTask {
+ public:
+
+  explicit HmsNotificationLogListenerTask(CatalogManager* catalog_manager);
+  ~HmsNotificationLogListenerTask();
+
+  // Initializes the HMS notification log listener. When invoking this method,
+  // the catalog manager must be in the process of initializing.
+  Status Init() WARN_UNUSED_RESULT;
+
+  // Shuts down the HMS notification log listener. This must be called before
+  // shutting down the catalog manager.
+  void Shutdown();
+
+  // Waits for the notification log listener to process the latest notification
+  // log event.
+  //
+  // Note: an error will be returned if the listener is unable to retrieve the
+  // latest notifications from the HMS. If individual notifications are unable
+  // to be processed, no error will be returned.
+  Status WaitForCatchUp(const MonoTime& deadline) WARN_UNUSED_RESULT;
+
+ private:
+
+  // Runs the main loop of the listening thread.
+  void RunLoop();
+
+  // Polls the Hive Metastore for notification events, and handle them.
+  Status Poll();
+
+  // Handles an ALTER TABLE event. Must only be called on the listening thread.
+  //
+  // The event is parsed, and if it is a rename table event for a Kudu table,
+  // the table is renamed in the local catalog.  All other events are ignored.
+  Status HandleAlterTableEvent(const hive::NotificationEvent& event,
+                               int64_t* durable_event_id) WARN_UNUSED_RESULT;
+
+  // Handles a DROP TABLE event. Must only be called on the listening thread.
+  //
+  // The event is parsed, and if it is a drop table event for a Kudu table, the
+  // table is deleted in the local catalog. All other events are ignored.
+  Status HandleDropTableEvent(const hive::NotificationEvent& event,
+                              int64_t* durable_event_id) WARN_UNUSED_RESULT;
+
+  // The associated catalog manager.
+  //
+  // May be initialized to nullptr in the constructor to facilitate unit
+  // testing. In this case all interactions with the catalog manager and HMS
+  // are skipped.
+  CatalogManager* catalog_manager_;
+
+  // The listening thread.
+  scoped_refptr<kudu::Thread> thread_;
+
+  // Protects access to fields below.
+  mutable Mutex lock_;
+
+  // Set to true if the task is in the process of shutting down.
+  //
+  // Protected by lock_.
+  bool closing_;
+
+  // Manages waking the notification log listener thread when the catalog
+  // manager needs to ensure that all recent notification log events have been
+  // handled.
+  //
+  // Protected by lock_.
+  ConditionVariable wake_up_cv_;
+
+  // Queue of callbacks to execute when the notification log listener is caught
+  // up. These callbacks enable the catalog manager to wait for the 
notification
+  // log listener to have processed the latest events before proceeding with
+  // metadata ops involving the HMS table namespace.
+  //
+  // Protected by lock_.
+  std::vector<StatusCallback> catch_up_callbacks_;
+};
+
+} // namespace master
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 17fc02f..1b59ef0 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -204,6 +204,12 @@ message SysTskEntryPB {
   required security.TokenSigningPrivateKeyPB tsk = 1;
 }
 
+// The on-disk entry in the sys.catalog table ("metadata" column) to represent
+// the latest processed Hive Metastore notification log event ID.
+message SysNotificationLogEventIdPB {
+  optional int64 latest_notification_log_event_id = 1;
+}
+
 ////////////////////////////////////////////////////////////
 // RPCs
 ////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 3cc8aa4..dd36fae 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -134,6 +134,8 @@ const char* const SysCatalogTable::kSysCertAuthorityEntryId 
=
     "root-ca-info";
 const char* const SysCatalogTable::kInjectedFailureStatusMsg =
     "INJECTED FAILURE";
+const char* const SysCatalogTable::kLatestNotificationLogEntryIdRowId =
+  "latest_notification_log_entry_id";
 
 namespace {
 
@@ -506,6 +508,10 @@ Status SysCatalogTable::Write(const Actions& actions) {
   ReqUpdateTablets(&req, actions.tablets_to_update);
   ReqDeleteTablets(&req, actions.tablets_to_delete);
 
+  if (actions.hms_notification_log_event_id) {
+    ReqSetNotificationLogEventId(&req, *actions.hms_notification_log_event_id);
+  }
+
   if (req.row_operations().rows().empty()) {
     // No actual changes were written (i.e the data to be updated matched the
     // previous version of the data).
@@ -648,6 +654,24 @@ Status SysCatalogTable::VisitTskEntries(TskEntryVisitor* 
visitor) {
   return ProcessRows<SysTskEntryPB, TSK_ENTRY>(processor);
 }
 
+Status SysCatalogTable::GetLatestNotificationLogEventId(int64_t* event_id) {
+  TRACE_EVENT0("master", "SysCatalogTable::GetLatestNotificationLogEventId");
+
+  *event_id = -1;
+  auto processor = [&](const string& entry_id, const 
SysNotificationLogEventIdPB& entry_data) {
+    if (entry_id != kLatestNotificationLogEntryIdRowId) {
+      // This is not the row we're looking for.
+      return Status::OK();
+    }
+    DCHECK(entry_data.has_latest_notification_log_event_id());
+    DCHECK(entry_data.latest_notification_log_event_id() >= 0);
+    *event_id = entry_data.latest_notification_log_event_id();
+    return Status::OK();
+  };
+
+  return ProcessRows<SysNotificationLogEventIdPB, 
HMS_NOTIFICATION_LOG>(processor);
+}
+
 Status SysCatalogTable::GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry) {
   CHECK(entry);
   vector<SysCertAuthorityEntryPB> entries;
@@ -789,6 +813,20 @@ void SysCatalogTable::ReqDeleteTablets(WriteRequestPB* req,
   }
 }
 
+void SysCatalogTable::ReqSetNotificationLogEventId(WriteRequestPB* req, 
int64_t event_id) {
+  SysNotificationLogEventIdPB pb;
+  pb.set_latest_notification_log_event_id(event_id);
+  faststring metadata_buf;
+  pb_util::SerializeToString(pb, &metadata_buf);
+
+  KuduPartialRow row(&schema_);
+  RowOperationsPBEncoder enc(req->mutable_row_operations());
+  CHECK_OK(row.SetInt8(kSysCatalogTableColType, HMS_NOTIFICATION_LOG));
+  CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, 
kLatestNotificationLogEntryIdRowId));
+  CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColMetadata, metadata_buf));
+  enc.Add(RowOperationsPB::UPSERT, row);
+}
+
 Status SysCatalogTable::VisitTablets(TabletVisitor* visitor) {
   TRACE_EVENT0("master", "SysCatalogTable::VisitTablets");
   auto processor = [&](

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 8aa92eb..2b0f56f 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -23,12 +23,14 @@
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest_prod.h>
 
 #include "kudu/common/schema.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -97,6 +99,7 @@ class TskEntryVisitor {
 //   * root CA (certificate authority) certificate of the Kudu IPKI
 //   * Kudu IPKI root CA cert's private key
 //   * TSK (Token Signing Key) entries
+//   * Latest handled Hive Metastore notification log event ID
 //
 // The essential properties of the SysCatalogTable are:
 //   * SysCatalogTable has only one tablet.
@@ -111,6 +114,9 @@ class SysCatalogTable {
   // There should be no more than one entry of this type in the system table.
   static const char* const kSysCertAuthorityEntryId;
 
+  // The row ID of the latest notification log entry in the sys catalog table.
+  static const char* const kLatestNotificationLogEntryIdRowId;
+
   typedef Callback<Status()> ElectedLeaderCallback;
 
   enum CatalogEntryType {
@@ -118,6 +124,7 @@ class SysCatalogTable {
     TABLETS_ENTRY = 2,
     CERT_AUTHORITY_INFO = 3,  // Kudu's root certificate authority entry.
     TSK_ENTRY = 4,            // Token Signing Key entry.
+    HMS_NOTIFICATION_LOG = 5, // HMS notification log latest event ID.
   };
 
   // 'leader_cb_' is invoked whenever this node is elected as a leader
@@ -152,6 +159,7 @@ class SysCatalogTable {
     std::vector<scoped_refptr<TabletInfo>> tablets_to_add;
     std::vector<scoped_refptr<TabletInfo>> tablets_to_update;
     std::vector<scoped_refptr<TabletInfo>> tablets_to_delete;
+    boost::optional<int64_t> hms_notification_log_event_id;
   };
   Status Write(const Actions& actions);
 
@@ -164,6 +172,9 @@ class SysCatalogTable {
   // Scan for TSK-related entries in the system table.
   Status VisitTskEntries(TskEntryVisitor* visitor);
 
+  // Get the latest processed HMS notification log event ID.
+  Status GetLatestNotificationLogEventId(int64_t* event_id) WARN_UNUSED_RESULT;
+
   // Retrive the CA entry (private key and certificate) from the system table.
   Status GetCertAuthorityEntry(SysCertAuthorityEntryPB* entry);
 
@@ -253,6 +264,9 @@ class SysCatalogTable {
   void ReqDeleteTablets(tserver::WriteRequestPB* req,
                         const std::vector<scoped_refptr<TabletInfo>>& tablets);
 
+  // Overwrite (upsert) the latest event ID in the table with the provided ID.
+  void ReqSetNotificationLogEventId(tserver::WriteRequestPB* req, int64_t 
event_id);
+
   static std::string TskSeqNumberToEntryId(int64_t seq_number);
 
   // Special string injected into SyncWrite() random failures (if enabled).

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f1ca32f/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc
index f612393..c960441 100644
--- a/src/kudu/util/test_util.cc
+++ b/src/kudu/util/test_util.cc
@@ -419,9 +419,10 @@ Status WaitForBind(pid_t pid, uint16_t* port, const char* 
kind, MonoDelta timeou
   // The first line is the pid. We ignore it.
   // The second line is the file descriptor number. We ignore it.
   // The third line has the bind address and port.
+  // Subsequent lines show active connections.
   vector<string> lines = strings::Split(lsof_out, "\n");
   int32_t p = -1;
-  if (lines.size() != 3 ||
+  if (lines.size() < 3 ||
       lines[2].substr(0, 3) != "n*:" ||
       !safe_strto32(lines[2].substr(3), &p) ||
       p <= 0) {

Reply via email to