Hi Thuan,

The TipcPortId:Send is also called at a few other places, do you think it is good if we make a wrapper of TipcPortId::Send with a few retries on failures, says TipcPortId::TryToSend(), and call TryToSend() instead of Send()?

Thanks

Minh

On 27/11/19 1:26 pm, thuan.tran wrote:
When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
  src/mds/mds_tipc_fctrl_portid.cc | 16 ++++++++++++----
  1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 724eb7b7b..e6e179669 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
#include "mds/mds_tipc_fctrl_portid.h"
  #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
#include "mds/mds_dt.h"
  #include "mds/mds_log.h"
@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
      // try to send a few pending msg
      DataMessage* msg = nullptr;
      uint16_t send_msg_cnt = 0;
-    while (send_msg_cnt++ < chunk_size_) {
+    while (send_msg_cnt < chunk_size_) {
        // find the lowest sequence unsent yet
        msg = sndqueue_.FirstUnsent();
        if (msg == nullptr) {
          break;
        } else {
            if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {
+            send_msg_cnt++;
              msg->is_sent_ = true;
              m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
                  "SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
                  msg->header_.fseq_, msg->header_.msg_len_,
                  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
            } else {
-            break;
+            // If not retry, all messages are kept in queue
+            // and no more trigger to send messages
+            osaf_nanosleep(&kTenMilliseconds);
+            continue;
            }
        }
      }
@@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
    DataMessage* msg = sndqueue_.Find(Seq16(fseq));
    if (msg != nullptr) {
      // Resend the msg found
-    if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-      msg->is_sent_ = true;
+    while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+      // If not retry, all messages are kept in queue
+      // and no more trigger to send messages
+      osaf_nanosleep(&kTenMilliseconds);
      }
+    msg->is_sent_ = true;
      m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
          "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to