Thanks Minh. I have no more comments.

Regards, Vu

On 9/23/19 7:48 AM, Minh Hon Chau wrote:
Hi all,

Below is the patch #10 that updates most of comments, it applies on top of current patch #9.

This patch #10 does not use the shared_ptr and base:Mutex as comments given by Gary and Vu, the reason is that it will cause a similar problem reported in #2860 (user call exit() without properly doing mds shutdown), unless those variables are allocated on the heap.

I would like to push the #1960 patches today if we don't have any more comments. There are some other incremental improvements/fixes that will be addressed in other tickets.

Thanks

Minh

---
 src/mds/README                   |  2 +-
 src/mds/mds_dt_tipc.c            | 28 ++++++++++++-----
 src/mds/mds_tipc_fctrl_intf.cc   | 67 ++++++++++++++++++++++++++--------------
 src/mds/mds_tipc_fctrl_intf.h    |  2 +-
 src/mds/mds_tipc_fctrl_msg.cc    | 44 +++++++++++++-------------
 src/mds/mds_tipc_fctrl_msg.h     | 22 +++++++++++--
 src/mds/mds_tipc_fctrl_portid.cc | 46 ++++++++++++++++-----------
 7 files changed, 137 insertions(+), 74 deletions(-)

diff --git a/src/mds/README b/src/mds/README
index 1b94632..0819bdc 100644
--- a/src/mds/README
+++ b/src/mds/README
@@ -182,7 +182,7 @@ TIPC portid state machine and its transition
 --------------------------------------------
 kDisabled, // no flow control support at this state
 kStartup,  // a newly published portid starts at this state
-kTxProb,   // txprob timer is running to confirm if the flow control is supported +kTxProb,   // tx probation timer is running to confirm if the flow control is supported
 kEnabled   // flow control support is confirmed, data flow is controlled
 kRcvBuffOverflow // anticipating (or experienced) the receiver's buffer overflow

diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 1b6c3f8..e7a7b48 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -247,6 +247,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
     if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) {
         close(tipc_cb.Dsock);
         close(tipc_cb.BSRsock);
+        *mds_tipc_ref = 0;
         return NCSCC_RC_FAILURE;
     }
     *mds_tipc_ref = port_id.ref;
@@ -330,7 +331,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
     }

     /* Get tipc socket receive buffer size */
-    int optval;
+    int optval = 0;
     socklen_t optlen = sizeof(optval);
     if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF,
         &optval, &optlen) != 0) {
@@ -350,12 +351,25 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
             int acksize = -1;
             if ((ptr = getenv("MDS_TIPC_FCTRL_ACKTIMEOUT")) != NULL) {
                 ackto = atoi(ptr);
+                if (ackto == 0) {
+                    syslog(LOG_ERR, "MDTM:TIPC Invalid "
+                            "MDS_TIPC_FCTRL_ACKTIMEOUT, using default value");
+                    ackto = -1;
+                }
             }
             if ((ptr = getenv("MDS_TIPC_FCTRL_ACKSIZE")) != NULL) {
                 acksize = atoi(ptr);
+                if (acksize == 0) {
+                    syslog(LOG_ERR, "MDTM:TIPC Invalid "
+                            "MDS_TIPC_FCTRL_ACKSIZE, using default value");
+                    acksize = -1;
+                }
             }
-            mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, (uint64_t)optval,
+            mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, optval,
                 ackto, acksize, tipc_mcast_enabled);
+        } else {
+            syslog(LOG_ERR, "MDTM:TIPC Invalid value of"
+                "MDS_TIPC_FCTRL_ENABLED");
         }
     }

@@ -366,6 +380,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
         close(tipc_cb.Dsock);
         close(tipc_cb.BSRsock);
         m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx, NULL);
+        mds_tipc_fctrl_shutdown();
         return NCSCC_RC_FAILURE;
     }

@@ -2528,7 +2543,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
      */
     uint32_t status = 0;
     uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
-  uint16_t fctrl_seq_num = 0;
+    uint16_t fctrl_seq_num = 0;
     int version = req->msg_arch_word & 0x7;
     if (version > 1) {
         sum_mds_hdr_plus_mdtm_hdr_plus_len =
@@ -2618,7 +2633,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                 return NCSCC_RC_FAILURE;
             }
           /* if sndqueue is capable, then obtain the current sending seq */ -          if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, &fctrl_seq_num)
+          if (mds_tipc_fctrl_sndqueue_capable(tipc_id, &fctrl_seq_num)
               == NCSCC_RC_FAILURE){
             m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len);
             return NCSCC_RC_FAILURE;
@@ -2717,10 +2732,10 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                 }
                 /* if sndqueue is capable, then obtain the current sending seq */
                 if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
-                    len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
                     &fctrl_seq_num) == NCSCC_RC_FAILURE){
                     m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d",
                         len + sum_mds_hdr_plus_mdtm_hdr_plus_len);
+                    m_MMGR_FREE_BUFR_LIST(usrbuf);
                     free(body);
                     return NCSCC_RC_FAILURE;
                 }
@@ -2828,7 +2843,6 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
             }
             /* if sndqueue is capable, then obtain the current sending seq */
             if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
-                req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
                 &fctrl_seq_num) == NCSCC_RC_FAILURE) {
                 m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d",
                     req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len); @@ -2999,7 +3013,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num,
                 }
             }
             /* if sndqueue is capable, then obtain the current sending seq */ -            if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, &fctrl_seq_num)
+            if (mds_tipc_fctrl_sndqueue_capable(id, &fctrl_seq_num)
                 == NCSCC_RC_FAILURE) {
                 m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len_buf);
                 m_MMGR_FREE_BUFR_LIST(usrbuf);
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 8949937..2366672 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -70,7 +70,7 @@ std::map<uint64_t, TipcPortId*> portid_map;
 std::mutex portid_map_mutex;

 // probation timer event to enable flow control at receivers
-const int64_t kBaseTimerInt = 200;  // in centisecond
+const int64_t kBaseTimerInt = 100;  // in centisecond
 const uint8_t kTxProbMaxRetries = 10;
 Timer txprob_timer(Event::Type::kEvtTmrTxProb);

@@ -97,7 +97,7 @@ void tmr_exp_cbk(void* uarg) {
   }
 }

-void process_timer_event(const Event evt) {
+void process_timer_event(const Event& evt) {
   bool txprob_restart = false;
   for (auto i : portid_map) {
     TipcPortId* portid = i.second;
@@ -118,18 +118,18 @@ void process_timer_event(const Event evt) {
   }
 }

-uint32_t process_flow_event(const Event evt) {
+uint32_t process_flow_event(const Event& evt) {
   uint32_t rc = NCSCC_RC_SUCCESS;
   TipcPortId *portid = portid_lookup(evt.id_);
   if (portid == nullptr) {
     if (evt.type_ == Event::Type::kEvtRcvData) {
-      portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
-          sock_buf_size);
+      portid = new TipcPortId(evt.id_, data_sock_fd,
+          kChunkAckSize, sock_buf_size);
       portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
-      if (evt.type_ == Event::Type::kEvtRcvData) {
-        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+      rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
             evt.fseq_, evt.svc_id_);
-      }
+    } else {
+      m_MDS_LOG_ERR("PortId not found for evt:%d", (int)evt.type_);
     }
   } else {
     if (evt.type_ == Event::Type::kEvtRcvData) {
@@ -154,13 +154,14 @@ uint32_t process_all_events(void) {
   enum { FD_FCTRL = 0, NUM_FDS };

   int poll_tmo = kChunkAckTimeout;
+  struct pollfd pfd[NUM_FDS] = {{0, 0, 0}};
+
+  pfd[FD_FCTRL].fd =
+      ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
+  pfd[FD_FCTRL].events = POLLIN;
+
   while (true) {
     int pollres;
-    struct pollfd pfd[NUM_FDS] = {{0}};
-
-    pfd[FD_FCTRL].fd =
-        ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
-    pfd[FD_FCTRL].events = POLLIN;

     pollres = poll(pfd, NUM_FDS, poll_tmo);

@@ -172,13 +173,13 @@ uint32_t process_all_events(void) {

     if (pollres > 0) {
       if (pfd[FD_FCTRL].revents == POLLIN) {
+        portid_map_mutex.lock();
+
         Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
             &mbx_events));

         if (evt == nullptr) continue;

-        portid_map_mutex.lock();
-
         if (evt->IsTimerEvent()) {
           process_timer_event(*evt);
         }
@@ -212,12 +213,14 @@ uint32_t create_ncs_task(void *task_hdl) {
   }
   if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
     m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
+    m_NCS_IPC_RELEASE(&mbx_events, nullptr);
     return NCSCC_RC_FAILURE;
   }
   if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
           "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
           &task_hdl) != NCSCC_RC_SUCCESS) {
-    m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
+    m_MDS_LOG_ERR("FCTRL: ncs_task_create() failed\n");
+    m_NCS_IPC_RELEASE(&mbx_events, nullptr);
     return NCSCC_RC_FAILURE;
   }

@@ -230,18 +233,20 @@ uint32_t create_ncs_task(void *task_hdl) {
 uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
     uint64_t rcv_buf_size, int32_t ackto, int32_t acksize,
     bool mcast_enabled) {
-  if (create_ncs_task(&p_task_hdl) !=
-      NCSCC_RC_SUCCESS) {
-    m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
-    return NCSCC_RC_FAILURE;
-  }
+
   data_sock_fd = dgramsock;
   snd_rcv_portid = id;
   sock_buf_size = rcv_buf_size;
-  is_fctrl_enabled = true;
   is_mcast_enabled = mcast_enabled;
   if (ackto != -1) kChunkAckTimeout = ackto;
   if (acksize != -1) kChunkAckSize = acksize;
+
+  if (create_ncs_task(&p_task_hdl) !=
+      NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("FCTRL: create_ncs_task() failed\n");
+    return NCSCC_RC_FAILURE;
+  }
+  is_fctrl_enabled = true;
   m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
       id.node, id.ref);

@@ -250,13 +255,29 @@ uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,

 uint32_t mds_tipc_fctrl_shutdown(void) {
   if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;
+
+  portid_map_mutex.lock();
+
   if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
     m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
   }
+
+  m_NCS_IPC_DETACH(&mbx_events, nullptr, nullptr);
+  m_NCS_IPC_RELEASE(&mbx_events, nullptr);
+
+  for (auto i : portid_map) delete i.second;
+  portid_map.clear();
+
+  portid_map_mutex.unlock();
+  is_fctrl_enabled = false;
+
+  m_MDS_LOG_NOTIFY("FCTRL: Shutdown [node:%x, ref:%u]",
+      snd_rcv_portid.node, snd_rcv_portid.ref);
+
   return NCSCC_RC_SUCCESS;
 }

-uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
           uint16_t* next_seq) {
   if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;

diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h
index c798b93..ed9c6a8 100644
--- a/src/mds/mds_tipc_fctrl_intf.h
+++ b/src/mds/mds_tipc_fctrl_intf.h
@@ -37,7 +37,7 @@ uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type);
 uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
 uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
     struct tipc_portid id);
-uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
     uint16_t* next_seq);
 uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
     struct tipc_portid id);
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index abd38d3..064d977 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -31,17 +31,17 @@ void HeaderMessage::Encode(uint8_t *msg) {
   uint8_t *ptr;

   // encode message length
-  ptr = &msg[0];
+  ptr = &msg[HeaderMessage::FieldIndex::kMessageLength];
   ncs_encode_16bit(&ptr, msg_len_);
   // encode sequence number
-  ptr = &msg[2];
+  ptr = &msg[HeaderMessage::FieldIndex::kSequenceNumber];
   ncs_encode_32bit(&ptr, mseq_);
-  // encode sequence number
-  ptr = &msg[6];
+  // encode fragment number
+  ptr = &msg[HeaderMessage::FieldIndex::kFragmentNumber];
   ncs_encode_16bit(&ptr, mfrag_);
   // skip length_check: oct8&9
   // encode protocol version
-  ptr = &msg[10];
+  ptr = &msg[HeaderMessage::FieldIndex::kProtocolVersion];
   ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
 }

@@ -49,32 +49,32 @@ void HeaderMessage::Decode(uint8_t *msg) {
   uint8_t *ptr;

   // decode message length
-  ptr = &msg[0];
+  ptr = &msg[HeaderMessage::FieldIndex::kMessageLength];
   msg_len_ = ncs_decode_16bit(&ptr);
   // decode sequence number
-  ptr = &msg[2];
+  ptr = &msg[HeaderMessage::FieldIndex::kSequenceNumber];
   mseq_ = ncs_decode_32bit(&ptr);
   // decode fragment number
-  ptr = &msg[6];
+  ptr = &msg[HeaderMessage::FieldIndex::kFragmentNumber];
   mfrag_ = ncs_decode_16bit(&ptr);
   // decode protocol version
-  ptr = &msg[10];
+  ptr = &msg[HeaderMessage::FieldIndex::kProtocolVersion];
   pro_ver_ = ncs_decode_8bit(&ptr);
   if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
     // decode flow control sequence number
-    ptr = &msg[8];
+    ptr = &msg[HeaderMessage::FieldIndex::kFlowControlSequenceNumber];
     fseq_ = ncs_decode_16bit(&ptr);
     // decode protocol identifier
-    ptr = &msg[11];
+    ptr = &msg[ChunkAck::FieldIndex::kProtocolIdentifier];
     pro_id_ = ncs_decode_32bit(&ptr);
     if (pro_id_ == MDS_PROT_FCTRL_ID) {
       // decode message type
-      ptr = &msg[15];
+      ptr = &msg[ChunkAck::FieldIndex::kFlowControlMessageType];
       msg_type_ = ncs_decode_8bit(&ptr);
     }
   } else {
     if (mfrag_ != 0) {
-      ptr = &msg[8];
+      ptr = &msg[HeaderMessage::FieldIndex::kFlowControlSequenceNumber];
       fseq_ = ncs_decode_16bit(&ptr);
       if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
     }
@@ -90,7 +90,7 @@ void DataMessage::Decode(uint8_t *msg) {
              MDS_HEADER_RCVR_SVC_ID_POSITION];
   svc_id_ = ncs_decode_16bit(&ptr);
   // decode snd_type
-  ptr = &msg[17];
+  ptr = &msg[DataMessage::FieldIndex::kSendType];
   snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
 }

@@ -109,19 +109,19 @@ ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size):
 void ChunkAck::Encode(uint8_t *msg) {
   uint8_t *ptr;
   // encode protocol identifier
-  ptr = &msg[11];
+  ptr = &msg[ChunkAck::FieldIndex::kProtocolIdentifier];
   ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
   // encode message type
-  ptr = &msg[15];
+  ptr = &msg[ChunkAck::FieldIndex::kFlowControlMessageType];
   ncs_encode_8bit(&ptr, kChunkAckMsgType);
   // encode service id
-  ptr = &msg[16];
+  ptr = &msg[ChunkAck::FieldIndex::kServiceId];
   ncs_encode_16bit(&ptr, svc_id_);
   // encode flow control sequence number
-  ptr = &msg[18];
+  ptr = &msg[ChunkAck::FieldIndex::kFlowControlSequenceNumber];
   ncs_encode_16bit(&ptr, acked_fseq_);
   // encode chunk size
-  ptr = &msg[20];
+  ptr = &msg[ChunkAck::FieldIndex::kChunkAckSize];
   ncs_encode_16bit(&ptr, chunk_size_);
 }

@@ -129,13 +129,13 @@ void ChunkAck::Decode(uint8_t *msg) {
   uint8_t *ptr;

   // decode service id
-  ptr = &msg[16];
+  ptr = &msg[ChunkAck::FieldIndex::kServiceId];
   svc_id_ = ncs_decode_16bit(&ptr);
   // decode flow control sequence number
-  ptr = &msg[18];
+  ptr = &msg[ChunkAck::FieldIndex::kFlowControlSequenceNumber];
   acked_fseq_ = ncs_decode_16bit(&ptr);
   // decode chunk size
-  ptr = &msg[20];
+  ptr = &msg[ChunkAck::FieldIndex::kChunkAckSize];
   chunk_size_ = ncs_decode_16bit(&ptr);
 }

diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index e6b9662..d67ed19 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -71,8 +71,8 @@ class Event {
     fseq_(f_seg_num), chunk_size_(chunk_size) {
     type_ = type;
   }
-  bool IsTimerEvent() { return (type_ > Type::kEvtTmrAll); }
-  bool IsFlowEvent() {
+  bool IsTimerEvent() const { return (type_ > Type::kEvtTmrAll); }
+  bool IsFlowEvent() const {
     return (Type::kEvtDataFlowAll < type_ && type_ < Type::kEvtTmrAll);
   }
 };
@@ -86,6 +86,14 @@ class BaseMessage {

 class HeaderMessage: public BaseMessage {
  public:
+  enum FieldIndex {
+    kMessageLength = 0,
+    kSequenceNumber = 2,
+    kFragmentNumber = 6,
+    kLengthCheck = 8,
+    kFlowControlSequenceNumber = kLengthCheck,  // reuse kLengthCheck
+    kProtocolVersion = 10
+  };
   uint8_t* msg_ptr_{nullptr};
   uint16_t msg_len_{0};
   uint32_t mseq_{0};
@@ -104,6 +112,9 @@ class HeaderMessage: public BaseMessage {

 class DataMessage: public BaseMessage {
  public:
+  enum FieldIndex {
+    kSendType = 17,
+  };
   HeaderMessage header_;
   uint16_t svc_id_{0};

@@ -118,6 +129,13 @@ class DataMessage: public BaseMessage {

 class ChunkAck: public BaseMessage {
  public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+    kServiceId = 16,
+    kFlowControlSequenceNumber = 18,
+    kChunkAckSize = 20
+  };
   static const uint8_t kChunkAckMsgType = 1;
   static const uint16_t kChunkAckMsgLength = 22;

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 365d72f..1ce792d 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -30,6 +30,7 @@ Timer::Timer(Event::Type type) {
 }

 Timer::~Timer() {
+  // do not call Stop
 }

 void Timer::Start(int64_t period, void (*tmr_exp_func)(void*)) {
@@ -57,8 +58,8 @@ void MessageQueue::Queue(DataMessage* msg) {
 }

 DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
-  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
-    DataMessage *m = *it;
+  for (const auto& it : queue_) {
+    DataMessage *m = it;
     if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
       return m;
     }
@@ -83,8 +84,8 @@ uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
 }

 DataMessage* MessageQueue::FirstUnsent() {
-  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
-    DataMessage *m = *it;
+  for (const auto& it : queue_) {
+    DataMessage *m = it;
     if (m->is_sent_ == false) {
       return m;
     }
@@ -93,8 +94,8 @@ DataMessage* MessageQueue::FirstUnsent() {
 }

 void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
-  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
-    DataMessage *m = *it;
+  for (const auto& it : queue_) {
+    DataMessage *m = it;
     if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
   }
 }
@@ -118,7 +119,6 @@ TipcPortId::~TipcPortId() {
   ReceiveTmrChunkAck();
   // flush all unsent msg in sndqueue_
   FlushData();
-  sndqueue_.Clear();
 }

 uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
@@ -143,6 +143,7 @@ void TipcPortId::FlushData() {
           sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
     }
   } while (msg != nullptr);
+  sndqueue_.Clear();
 }

 uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
@@ -203,10 +204,20 @@ bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
   if (sndwnd_.nacked_space_ + sending_len < rcv_buf_size_) {
     return true;
   } else {
-    state_ = State::kRcvBuffOverflow;
-    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
-        ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
-        sending_len, rcv_buf_size_);
+    if (state_ == State::kTxProb) {
+      // Too many msgs are not acked by receiver while in txprob state
+      // disable flow control
+      state_ = State::kDisabled;
+      m_MDS_LOG_ERR("FCTRL: [node:%x, ref:%u] --> Disabled, %" PRIu64
+          ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+          sending_len, rcv_buf_size_);
+      return true;
+    } else if (state_ == State::kEnabled) {
+      state_ = State::kRcvBuffOverflow;
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
+          ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+          sending_len, rcv_buf_size_);
+    }
     return false;
   }
 }
@@ -242,7 +253,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
   // update state
   if (state_ == State::kTxProb || state_ == State::kStartup) {
     state_ = State::kEnabled;
-    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
         "RcvData, TxProb[retries:%u, state:%u]",
         id_.node, id_.ref,
         txprob_cnt_, (uint8_t)state_);
@@ -331,7 +342,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
   // update state
   if (state_ == State::kTxProb) {
     state_ = State::kEnabled;
-    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
         "RcvChkAck, "
         "TxProb[retries:%u, state:%u]",
         id_.node, id_.ref,
@@ -358,7 +369,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
     sndwnd_.nacked_space_ -= acked_bytes;

     // try to send a few pending msg
-    DataMessage* msg;
+    DataMessage* msg = nullptr;
     uint64_t resend_bytes = 0;
     while (resend_bytes < acked_bytes) {
       // find the lowest sequence unsent yet
@@ -386,7 +397,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
     // no more unsent message, back to kEnabled
     if (msg == nullptr && state_ == State::kRcvBuffOverflow) {
       state_ = State::kEnabled;
-      m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
           id_.node, id_.ref);
     }
   } else {
@@ -423,7 +434,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
   }
   if (state_ != State::kRcvBuffOverflow) {
     state_ = State::kRcvBuffOverflow;
-    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
+    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow ",
         id_.node, id_.ref);
     sndqueue_.MarkUnsentFrom(Seq16(fseq));
   }
@@ -466,10 +477,9 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
     // receiver is at old mds version
     if (state_ == State::kDisabled) {
       FlushData();
-      sndqueue_.Clear();
     }

-    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
         "TxProbExp, TxProb[retries:%u, state:%u]",
         id_.node, id_.ref,
         txprob_cnt_, (uint8_t)state_);



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to