This is an automated email from the ASF dual-hosted git repository.

jpeach pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit 6bcfa0888a996cd2d8523e422a0508c0ba2441a3
Author: James Peach <jpe...@apache.org>
AuthorDate: Wed May 24 22:49:29 2017 -0700

    Simplify ProcessManager message and shutdown handling.
    
    Simplify ProcessManager message handling to make it clearer when
    we are stopping and wen we are dealing with a process manager error
    pumping messages to traffic_manager. We do this by hoisting the
    message reading cose into a helper function that just deals with
    reading the data, and propagating the error up to the manager thread.
    At that point, we can more easily know whether we are shutting down
    or not.
---
 mgmt/BaseManager.cc    |   3 +-
 mgmt/ProcessManager.cc | 170 +++++++++++++++++++++++++++----------------------
 mgmt/ProcessManager.h  |   8 +--
 3 files changed, 101 insertions(+), 80 deletions(-)

diff --git a/mgmt/BaseManager.cc b/mgmt/BaseManager.cc
index 25eede8..7dab2aa 100644
--- a/mgmt/BaseManager.cc
+++ b/mgmt/BaseManager.cc
@@ -145,7 +145,8 @@ BaseManager::signalMgmtEntity(int msg_id, char *data_raw, 
int data_len)
     mh->msg_id   = msg_id;
     mh->data_len = 0;
   }
-  ink_assert(enqueue(mgmt_event_queue, mh));
+
+  ink_release_assert(enqueue(mgmt_event_queue, mh));
   return msg_id;
 
 } /* End BaseManager::signalMgmtEntity */
diff --git a/mgmt/ProcessManager.cc b/mgmt/ProcessManager.cc
index 7c5c4ea..f4616f3 100644
--- a/mgmt/ProcessManager.cc
+++ b/mgmt/ProcessManager.cc
@@ -36,6 +36,61 @@
  */
 inkcoreapi ProcessManager *pmgmt = nullptr;
 
+// read_management_message attempts to read a message from the management
+// socket. Returns -errno on error, otherwise 0. If a message was read the
+// *msg pointer will be filled in with the message that was read.
+static int
+read_management_message(int sockfd, ink_hrtime timeout, MgmtMessageHdr **msg)
+{
+  MgmtMessageHdr hdr;
+  int ret;
+
+  *msg = nullptr;
+
+  switch (mgmt_read_timeout(sockfd, ink_hrtime_to_sec(timeout), 0 /* usec */)) 
{
+  case 0:
+    // Timed out.
+    return 0;
+  case -1:
+    return -errno;
+  }
+
+  // We have a message, try to read the message header.
+  ret = mgmt_read_pipe(sockfd, reinterpret_cast<char *>(&hdr), 
sizeof(MgmtMessageHdr));
+  switch (ret) {
+  case 0:
+    // Received EOF.
+    return 0;
+  case sizeof(MgmtMessageHdr):
+    break;
+  default:
+    // Received -errno.
+    return ret;
+  }
+
+  size_t msg_size          = sizeof(MgmtMessageHdr) + hdr.data_len;
+  MgmtMessageHdr *full_msg = (MgmtMessageHdr *)ats_malloc(msg_size);
+
+  memcpy(full_msg, &hdr, sizeof(MgmtMessageHdr));
+  char *data_raw = reinterpret_cast<char *>(full_msg) + sizeof(MgmtMessageHdr);
+
+  ret = mgmt_read_pipe(sockfd, data_raw, hdr.data_len);
+  if (ret == 0) {
+    // Received EOF.
+    ats_free(full_msg);
+    return 0;
+  } else if (ret < 0) {
+    // Received -errno.
+    ats_free(full_msg);
+    return ret;
+  } else {
+    ink_release_assert(ret == hdr.data_len);
+    // Received the message.
+    *msg = full_msg;
+    return 0;
+  }
+}
+
 void
 ProcessManager::start(std::function<void()> const &cb)
 {
@@ -94,13 +149,28 @@ ProcessManager::processManagerThread(void *arg)
     pmgmt->init();
   }
 
+  // Start pumping messages between the local process and the process
+  // manager. This will terminate when the process manager terminates
+  // or the local process calls stop(). In either case, it is likely
+  // that we will first notice because we got a socket error, but in
+  // the latter case, the `running` flag has already been toggled so
+  // we know that we are really doing a shutdown.
   while (pmgmt->running) {
+    int ret;
+
     if (pmgmt->require_lm) {
-      pmgmt->pollLMConnection();
+      ret = pmgmt->pollLMConnection();
+      if (ret < 0 && pmgmt->running) {
+        Alert("exiting with read error from process manager: %s", 
strerror(-ret));
+      }
     }
 
     pmgmt->processEventQueue();
-    pmgmt->processSignalQueue();
+    ret = pmgmt->processSignalQueue();
+    if (ret < 0 && pmgmt->running) {
+      Alert("exiting with write error from process manager: %s", 
strerror(-ret));
+    }
+
     mgmt_sleep_sec(pmgmt->timeout);
   }
 
@@ -198,25 +268,25 @@ ProcessManager::processEventQueue()
   return ret;
 }
 
-bool
+int
 ProcessManager::processSignalQueue()
 {
-  bool ret = false;
-
   while (!queue_is_empty(mgmt_signal_queue)) {
     MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_signal_queue);
 
     Debug("pmgmt", "signaling local manager with message ID %d", mh->msg_id);
 
-    if (require_lm && mgmt_write_pipe(local_manager_sockfd, (char *)mh, 
sizeof(MgmtMessageHdr) + mh->data_len) <= 0) {
-      Fatal("error writing message: %s", strerror(errno));
-    } else {
+    if (require_lm) {
+      int ret = mgmt_write_pipe(local_manager_sockfd, (char *)mh, 
sizeof(MgmtMessageHdr) + mh->data_len);
       ats_free(mh);
-      ret = true;
+
+      if (ret < 0) {
+        return ret;
+      }
     }
   }
 
-  return ret;
+  return 0;
 }
 
 void
@@ -258,6 +328,7 @@ ProcessManager::initLMConnection()
   mh_full           = (MgmtMessageHdr *)alloca(sizeof(MgmtMessageHdr) + 
data_len);
   mh_full->msg_id   = MGMT_SIGNAL_PID;
   mh_full->data_len = data_len;
+
   memcpy((char *)mh_full + sizeof(MgmtMessageHdr), &(pid), data_len);
 
   if (mgmt_write_pipe(local_manager_sockfd, (char *)mh_full, 
sizeof(MgmtMessageHdr) + data_len) <= 0) {
@@ -265,82 +336,31 @@ ProcessManager::initLMConnection()
   }
 }
 
-void
+int
 ProcessManager::pollLMConnection()
 {
-  MgmtMessageHdr mh_hdr;
-  char *data_raw;
-
   // Avoid getting stuck enqueuing too many requests in a row, limit to 
MAX_MSGS_IN_A_ROW.
   int count;
-  for (count = 0; running && count < max_msgs_in_a_row; ++count) {
-    int num;
 
-    num = mgmt_read_timeout(local_manager_sockfd, 1 /* sec */, 0 /* usec */);
-    if (num == 0) {
-      // Nothing but a timeout. We are done for now.
-      break;
+  for (count = 0; running && count < max_msgs_in_a_row; ++count) {
+    MgmtMessageHdr *msg;
+    int ret = read_management_message(local_manager_sockfd, HRTIME_SECONDS(1), 
&msg);
+    if (ret < 0) {
+      return ret;
     }
 
-    if (num < 0) {
-      // Socket read error.
-      Debug("pmgmt", "socket select failed: %s", strerror(errno));
-      continue;
+    // No message, we are done polling. */
+    if (msg == nullptr) {
+      break;
     }
 
-    if (num > 0) {
-      // We have a message, try to read the message header.
-      int res = mgmt_read_pipe(local_manager_sockfd, (char *)&mh_hdr, 
sizeof(MgmtMessageHdr));
-
-      if (res < 0) {
-        if (running) {
-          // If we are no longer running, the socket might have been closed out
-          // from under us. At any rate, we don't care any more.
-          Fatal("socket read error: %s", strerror(errno));
-        }
-
-        break;
-      }
-
-      // Handle EOF from the manager. This is normal, so we log an Alert
-      // rather than a Fatal.
-      if (res == 0) {
-        close_socket(local_manager_sockfd);
-        Alert("exiting with EOF from process manager");
-      }
-
-      if (res > 0) {
-        size_t mh_full_size     = sizeof(MgmtMessageHdr) + mh_hdr.data_len;
-        MgmtMessageHdr *mh_full = (MgmtMessageHdr *)ats_malloc(mh_full_size);
-
-        memcpy(mh_full, &mh_hdr, sizeof(MgmtMessageHdr));
-        data_raw = (char *)mh_full + sizeof(MgmtMessageHdr);
-
-        res = mgmt_read_pipe(local_manager_sockfd, data_raw, mh_hdr.data_len);
-        if (res > 0) {
-          Debug("pmgmt", "received message ID %d", mh_full->msg_id);
-          handleMgmtMsgFromLM(mh_full);
-        }
-
-        if (res == 0) {
-          close_socket(local_manager_sockfd);
-          Alert("exiting with EOF from process manager");
-        }
-
-        if (res < 0) {
-          if (running) {
-            // If we are no longer running, the socket might have been closed 
out
-            // from under us. At any rate, we don't care any more.
-            Fatal("socket read error: %s", strerror(errno));
-          }
-        }
-
-        ats_free(mh_full);
-      }
-    }
+    Debug("pmgmt", "received message ID %d", msg->msg_id);
+    handleMgmtMsgFromLM(msg);
+    ats_free(msg);
   }
 
-  Debug("pmgmt", "[%s] enqueued %d of max %d messages in a row", __func__, 
count, max_msgs_in_a_row);
+  Debug("pmgmt", "enqueued %d of max %d messages in a row", count, 
max_msgs_in_a_row);
+  return 0;
 }
 
 void
diff --git a/mgmt/ProcessManager.h b/mgmt/ProcessManager.h
index e25b4ef..c9e27c1 100644
--- a/mgmt/ProcessManager.h
+++ b/mgmt/ProcessManager.h
@@ -65,12 +65,8 @@ public:
 
   void reconfigure();
   void initLMConnection();
-  void pollLMConnection();
   void handleMgmtMsgFromLM(MgmtMessageHdr *mh);
 
-  bool processEventQueue();
-  bool processSignalQueue();
-
   void
   registerPluginCallbacks(ConfigUpdateCbTable *_cbtable)
   {
@@ -78,6 +74,10 @@ public:
   }
 
 private:
+  int pollLMConnection();
+  int processSignalQueue();
+  bool processEventQueue();
+
   bool require_lm;
   RecInt timeout;
   LLQ *mgmt_signal_queue;

-- 
To stop receiving notification emails like this one, please contact
"commits@trafficserver.apache.org" <commits@trafficserver.apache.org>.

Reply via email to