This change introduces a light list keeping all invocations that not yet
get the acknowledgement from log server. If the server is disappeared
in case of headless, log agent will notify all lost invocations to log client
with error code SA_AIS_ERR_TRY_AGAIN.
---
 src/log/agent/lga_agent.cc  |  2 ++
 src/log/agent/lga_client.cc |  8 +++++++-
 src/log/agent/lga_client.h  | 34 ++++++++++++++++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/src/log/agent/lga_agent.cc b/src/log/agent/lga_agent.cc
index 1000bb3fd..0049323c8 100644
--- a/src/log/agent/lga_agent.cc
+++ b/src/log/agent/lga_agent.cc
@@ -1296,6 +1296,8 @@ SaAisErrorT 
LogAgent::saLogWriteLogAsync(SaLogStreamHandleT logStreamHandle,
   if (NCSCC_RC_SUCCESS !=
       lga_mds_msg_async_send(&msg, MDS_SEND_PRIORITY_MEDIUM)) {
     ais_rc = SA_AIS_ERR_TRY_AGAIN;
+  } else {
+    client->KeepTrack(invocation, ackFlags);
   }
 
   return ais_rc;
diff --git a/src/log/agent/lga_client.cc b/src/log/agent/lga_client.cc
index 386c84929..cdc54904a 100644
--- a/src/log/agent/lga_client.cc
+++ b/src/log/agent/lga_client.cc
@@ -84,7 +84,9 @@ LogClient::~LogClient() {
   for (auto& stream : stream_list_) {
     if (stream != nullptr) delete stream;
   }
+
   stream_list_.clear();
+  unacked_invocations_.clear();
 
   // Free the client handle allocated to this log client
   if (handle_ != 0) {
@@ -129,9 +131,11 @@ void LogClient::InvokeCallback(const lgsv_msg_t* msg) {
   // Invoke the corresponding callback
   switch (cbk_info->type) {
     case LGSV_WRITE_LOG_CALLBACK_IND: {
-      if (callbacks_.saLogWriteLogCallback)
+      if (callbacks_.saLogWriteLogCallback) {
+        RemoveTrack(cbk_info->inv);
         callbacks_.saLogWriteLogCallback(cbk_info->inv,
                                          cbk_info->write_cbk.error);
+      }
     } break;
 
     case LGSV_SEVERITY_FILTER_CALLBACK: {
@@ -395,6 +399,8 @@ void LogClient::NoLogServer() {
     // When LOG server restart from headless, Log agent will do recover them.
     stream->SetRecoveryFlag(false);
   }
+
+  NotifyClientAboutLostInvocations();
 }
 
 uint32_t LogClient::SendMsgToMbx(lgsv_msg_t* msg, MDS_SEND_PRIORITY_TYPE prio) 
{
diff --git a/src/log/agent/lga_client.h b/src/log/agent/lga_client.h
index d7060cc13..f5fa6faa4 100644
--- a/src/log/agent/lga_client.h
+++ b/src/log/agent/lga_client.h
@@ -20,6 +20,7 @@
 
 #include <stdint.h>
 #include <vector>
+#include <list>
 #include <atomic>
 #include <saLog.h>
 #include "base/mutex.h"
@@ -169,6 +170,35 @@ class LogClient {
     return ref_counter_object_.RestoreRefCounter(caller, value, updated);
   }
 
+  // Track the number of write requests sent to log server but not yet
+  // get acknowledgement from it.
+  void KeepTrack(SaInvocationT inv, uint32_t ack_flags) {
+    if (ack_flags != SA_LOG_RECORD_WRITE_ACK) return;
+    unacked_invocations_.push_back(inv);
+  }
+
+  // Got an acknowledgment, so remove from the track list.
+  void RemoveTrack(SaInvocationT inv) { unacked_invocations_.remove(inv); }
+
+  void NotifyClientAboutLostInvocations() {
+    for (const auto& i : unacked_invocations_) {
+      TRACE("The write async with this invocation %lld has been lost", i);
+      // the below memory will be freed by lga_msg_destroy(cbk_msg)
+      // after done processing with this msg from the mailbox.
+      lgsv_msg_t* msg = static_cast<lgsv_msg_t*>(malloc(sizeof(lgsv_msg_t)));
+      assert(msg && "Failed to allocate memory for lgsv_msg_t");
+      memset(msg, 0, sizeof(lgsv_msg_t));
+      msg->type = LGSV_LGS_CBK_MSG;
+      msg->info.cbk_info.type = LGSV_WRITE_LOG_CALLBACK_IND;
+      msg->info.cbk_info.lgs_client_id = client_id_;
+      msg->info.cbk_info.write_cbk.error = SA_AIS_ERR_TRY_AGAIN;
+      msg->info.cbk_info.inv = i;
+
+      SendMsgToMbx(msg, MDS_SEND_PRIORITY_HIGH);
+    }
+    unacked_invocations_.clear();
+  }
+
   // true if the client is successfully done recovery.
   // or the client has just borned.
   // Introduce this method to avoid locking the successful recovered client
@@ -256,6 +286,10 @@ class LogClient {
   // Hold all log streams belong to @this client
   std::vector<LogStreamInfo*> stream_list_;
 
+  // Hold all invocations that not yet get acknowledgement from log server.
+  // If cluster goes to headless, log agent will inform to log client with
+  // SA_AIS_ERR_TRY_AGAIN code for these invocations.
+  std::list<SaInvocationT> unacked_invocations_{};
   // LOG handle (derived from hdl-mngr)
   SaLogHandleT handle_;
 
-- 
2.17.1



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to