Hi all,

Below is the patch #10 that updates most of comments, it applies on top of current patch #9.

This patch #10 does not use the shared_ptr and base:Mutex as comments given by Gary and Vu, the reason is that it will cause a similar problem reported in #2860 (user call exit() without properly doing mds shutdown), unless those variables are allocated on the heap.

I would like to push the #1960 patches today if we don't have any more comments. There are some other incremental improvements/fixes that will be addressed in other tickets.

Thanks

Minh

---
 src/mds/README                   |  2 +-
 src/mds/mds_dt_tipc.c            | 28 ++++++++++++-----
 src/mds/mds_tipc_fctrl_intf.cc   | 67 ++++++++++++++++++++++++++--------------
 src/mds/mds_tipc_fctrl_intf.h    |  2 +-
 src/mds/mds_tipc_fctrl_msg.cc    | 44 +++++++++++++-------------
 src/mds/mds_tipc_fctrl_msg.h     | 22 +++++++++++--
 src/mds/mds_tipc_fctrl_portid.cc | 46 ++++++++++++++++-----------
 7 files changed, 137 insertions(+), 74 deletions(-)

diff --git a/src/mds/README b/src/mds/README
index 1b94632..0819bdc 100644
--- a/src/mds/README
+++ b/src/mds/README
@@ -182,7 +182,7 @@ TIPC portid state machine and its transition
 --------------------------------------------
 kDisabled, // no flow control support at this state
 kStartup,  // a newly published portid starts at this state
-kTxProb,   // txprob timer is running to confirm if the flow control is supported +kTxProb,   // tx probation timer is running to confirm if the flow control is supported
 kEnabled   // flow control support is confirmed, data flow is controlled
 kRcvBuffOverflow // anticipating (or experienced) the receiver's buffer overflow

diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 1b6c3f8..e7a7b48 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -247,6 +247,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
     if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) {
         close(tipc_cb.Dsock);
         close(tipc_cb.BSRsock);
+        *mds_tipc_ref = 0;
         return NCSCC_RC_FAILURE;
     }
     *mds_tipc_ref = port_id.ref;
@@ -330,7 +331,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
     }

     /* Get tipc socket receive buffer size */
-    int optval;
+    int optval = 0;
     socklen_t optlen = sizeof(optval);
     if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF,
         &optval, &optlen) != 0) {
@@ -350,12 +351,25 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
             int acksize = -1;
             if ((ptr = getenv("MDS_TIPC_FCTRL_ACKTIMEOUT")) != NULL) {
                 ackto = atoi(ptr);
+                if (ackto == 0) {
+                    syslog(LOG_ERR, "MDTM:TIPC Invalid "
+                            "MDS_TIPC_FCTRL_ACKTIMEOUT, using default value");
+                    ackto = -1;
+                }
             }
             if ((ptr = getenv("MDS_TIPC_FCTRL_ACKSIZE")) != NULL) {
                 acksize = atoi(ptr);
+                if (acksize == 0) {
+                    syslog(LOG_ERR, "MDTM:TIPC Invalid "
+                            "MDS_TIPC_FCTRL_ACKSIZE, using default value");
+                    acksize = -1;
+                }
             }
-            mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, (uint64_t)optval,
+            mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, optval,
                 ackto, acksize, tipc_mcast_enabled);
+        } else {
+            syslog(LOG_ERR, "MDTM:TIPC Invalid value of"
+                "MDS_TIPC_FCTRL_ENABLED");
         }
     }

@@ -366,6 +380,7 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
         close(tipc_cb.Dsock);
         close(tipc_cb.BSRsock);
         m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx, NULL);
+        mds_tipc_fctrl_shutdown();
         return NCSCC_RC_FAILURE;
     }

@@ -2528,7 +2543,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
      */
     uint32_t status = 0;
     uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
-  uint16_t fctrl_seq_num = 0;
+    uint16_t fctrl_seq_num = 0;
     int version = req->msg_arch_word & 0x7;
     if (version > 1) {
         sum_mds_hdr_plus_mdtm_hdr_plus_len =
@@ -2618,7 +2633,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                 return NCSCC_RC_FAILURE;
             }
           /* if sndqueue is capable, then obtain the current sending seq */
-          if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, &fctrl_seq_num)
+          if (mds_tipc_fctrl_sndqueue_capable(tipc_id, &fctrl_seq_num)
               == NCSCC_RC_FAILURE){
             m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len);
             return NCSCC_RC_FAILURE;
@@ -2717,10 +2732,10 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                 }
                 /* if sndqueue is capable, then obtain the current sending seq */
                 if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
-                    len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
                     &fctrl_seq_num) == NCSCC_RC_FAILURE){
                     m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d",
                         len + sum_mds_hdr_plus_mdtm_hdr_plus_len);
+                    m_MMGR_FREE_BUFR_LIST(usrbuf);
                     free(body);
                     return NCSCC_RC_FAILURE;
                 }
@@ -2828,7 +2843,6 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
             }
             /* if sndqueue is capable, then obtain the current sending seq */
             if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
-                req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
                 &fctrl_seq_num) == NCSCC_RC_FAILURE) {
                 m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d",
                     req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len); @@ -2999,7 +3013,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num,
                 }
             }
             /* if sndqueue is capable, then obtain the current sending seq */ -            if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, &fctrl_seq_num)
+            if (mds_tipc_fctrl_sndqueue_capable(id, &fctrl_seq_num)
                 == NCSCC_RC_FAILURE) {
                 m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len_buf);
                 m_MMGR_FREE_BUFR_LIST(usrbuf);
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 8949937..2366672 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -70,7 +70,7 @@ std::map<uint64_t, TipcPortId*> portid_map;
 std::mutex portid_map_mutex;

 // probation timer event to enable flow control at receivers
-const int64_t kBaseTimerInt = 200;  // in centisecond
+const int64_t kBaseTimerInt = 100;  // in centisecond
 const uint8_t kTxProbMaxRetries = 10;
 Timer txprob_timer(Event::Type::kEvtTmrTxProb);

@@ -97,7 +97,7 @@ void tmr_exp_cbk(void* uarg) {
   }
 }

-void process_timer_event(const Event evt) {
+void process_timer_event(const Event& evt) {
   bool txprob_restart = false;
   for (auto i : portid_map) {
     TipcPortId* portid = i.second;
@@ -118,18 +118,18 @@ void process_timer_event(const Event evt) {
   }
 }

-uint32_t process_flow_event(const Event evt) {
+uint32_t process_flow_event(const Event& evt) {
   uint32_t rc = NCSCC_RC_SUCCESS;
   TipcPortId *portid = portid_lookup(evt.id_);
   if (portid == nullptr) {
     if (evt.type_ == Event::Type::kEvtRcvData) {
-      portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
-          sock_buf_size);
+      portid = new TipcPortId(evt.id_, data_sock_fd,
+          kChunkAckSize, sock_buf_size);
       portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
-      if (evt.type_ == Event::Type::kEvtRcvData) {
-        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+      rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
             evt.fseq_, evt.svc_id_);
-      }
+    } else {
+      m_MDS_LOG_ERR("PortId not found for evt:%d", (int)evt.type_);
     }
   } else {
     if (evt.type_ == Event::Type::kEvtRcvData) {
@@ -154,13 +154,14 @@ uint32_t process_all_events(void) {
   enum { FD_FCTRL = 0, NUM_FDS };

   int poll_tmo = kChunkAckTimeout;
+  struct pollfd pfd[NUM_FDS] = {{0, 0, 0}};
+
+  pfd[FD_FCTRL].fd =
+      ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
+  pfd[FD_FCTRL].events = POLLIN;
+
   while (true) {
     int pollres;
-    struct pollfd pfd[NUM_FDS] = {{0}};
-
-    pfd[FD_FCTRL].fd =
-        ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
-    pfd[FD_FCTRL].events = POLLIN;

     pollres = poll(pfd, NUM_FDS, poll_tmo);

@@ -172,13 +173,13 @@ uint32_t process_all_events(void) {

     if (pollres > 0) {
       if (pfd[FD_FCTRL].revents == POLLIN) {
+        portid_map_mutex.lock();
+
         Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
             &mbx_events));

         if (evt == nullptr) continue;

-        portid_map_mutex.lock();
-
         if (evt->IsTimerEvent()) {
           process_timer_event(*evt);
         }
@@ -212,12 +213,14 @@ uint32_t create_ncs_task(void *task_hdl) {
   }
   if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
     m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
+    m_NCS_IPC_RELEASE(&mbx_events, nullptr);
     return NCSCC_RC_FAILURE;
   }
   if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
           "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
           &task_hdl) != NCSCC_RC_SUCCESS) {
-    m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
+    m_MDS_LOG_ERR("FCTRL: ncs_task_create() failed\n");
+    m_NCS_IPC_RELEASE(&mbx_events, nullptr);
     return NCSCC_RC_FAILURE;
   }

@@ -230,18 +233,20 @@ uint32_t create_ncs_task(void *task_hdl) {
 uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
     uint64_t rcv_buf_size, int32_t ackto, int32_t acksize,
     bool mcast_enabled) {
-  if (create_ncs_task(&p_task_hdl) !=
-      NCSCC_RC_SUCCESS) {
-    m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
-    return NCSCC_RC_FAILURE;
-  }
+
   data_sock_fd = dgramsock;
   snd_rcv_portid = id;
   sock_buf_size = rcv_buf_size;
-  is_fctrl_enabled = true;
   is_mcast_enabled = mcast_enabled;
   if (ackto != -1) kChunkAckTimeout = ackto;
   if (acksize != -1) kChunkAckSize = acksize;
+
+  if (create_ncs_task(&p_task_hdl) !=
+      NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("FCTRL: create_ncs_task() failed\n");
+    return NCSCC_RC_FAILURE;
+  }
+  is_fctrl_enabled = true;
   m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
       id.node, id.ref);

@@ -250,13 +255,29 @@ uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,

 uint32_t mds_tipc_fctrl_shutdown(void) {
   if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;
+
+  portid_map_mutex.lock();
+
   if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
     m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
   }
+
+  m_NCS_IPC_DETACH(&mbx_events, nullptr, nullptr);
+  m_NCS_IPC_RELEASE(&mbx_events, nullptr);
+
+  for (auto i : portid_map) delete i.second;
+  portid_map.clear();
+
+  portid_map_mutex.unlock();
+  is_fctrl_enabled = false;
+
+  m_MDS_LOG_NOTIFY("FCTRL: Shutdown [node:%x, ref:%u]",
+      snd_rcv_portid.node, snd_rcv_portid.ref);
+
   return NCSCC_RC_SUCCESS;
 }

-uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
           uint16_t* next_seq) {
   if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;

diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h
index c798b93..ed9c6a8 100644
--- a/src/mds/mds_tipc_fctrl_intf.h
+++ b/src/mds/mds_tipc_fctrl_intf.h
@@ -37,7 +37,7 @@ uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type);
 uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
 uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
     struct tipc_portid id);
-uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id,
     uint16_t* next_seq);
 uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
     struct tipc_portid id);
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index abd38d3..064d977 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -31,17 +31,17 @@ void HeaderMessage::Encode(uint8_t *msg) {
   uint8_t *ptr;

   // encode message length
-  ptr = &msg[0];
+  ptr = &msg[HeaderMessage::FieldIndex::kMessageLength];
   ncs_encode_16bit(&ptr, msg_len_);
   // encode sequence number
-  ptr = &msg[2];
+  ptr = &msg[HeaderMessage::FieldIndex::kSequenceNumber];
   ncs_encode_32bit(&ptr, mseq_);
-  // encode sequence number
-  ptr = &msg[6];
+  // encode fragment number
+  ptr = &msg[HeaderMessage::FieldIndex::kFragmentNumber];
   ncs_encode_16bit(&ptr, mfrag_);
   // skip length_check: oct8&9
   // encode protocol version
-  ptr = &msg[10];
+  ptr = &msg[HeaderMessage::FieldIndex::kProtocolVersion];
   ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
 }

@@ -49,32 +49,32 @@ void HeaderMessage::Decode(uint8_t *msg) {
   uint8_t *ptr;

   // decode message length
-  ptr = &msg[0];
+  ptr = &msg[HeaderMessage::FieldIndex::kMessageLength];
   msg_len_ = ncs_decode_16bit(&ptr);
   // decode sequence number
-  ptr = &msg[2];
+  ptr = &msg[HeaderMessage::FieldIndex::kSequenceNumber];
   mseq_ = ncs_decode_32bit(&ptr);
   // decode fragment number
-  ptr = &msg[6];
+  ptr = &msg[HeaderMessage::FieldIndex::kFragmentNumber];
   mfrag_ = ncs_decode_16bit(&ptr);
   // decode protocol version
-  ptr = &msg[10];
+  ptr = &msg[HeaderMessage::FieldIndex::kProtocolVersion];
   pro_ver_ = ncs_decode_8bit(&ptr);
   if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
     // decode flow control sequence number
-    ptr = &msg[8];
+    ptr = &msg[HeaderMessage::FieldIndex::kFlowControlSequenceNumber];
     fseq_ = ncs_decode_16bit(&ptr);
     // decode protocol identifier
-    ptr = &msg[11];
+    ptr = &msg[ChunkAck::FieldIndex::kProtocolIdentifier];
     pro_id_ = ncs_decode_32bit(&ptr);
     if (pro_id_ == MDS_PROT_FCTRL_ID) {
       // decode message type
-      ptr = &msg[15];
+      ptr = &msg[ChunkAck::FieldIndex::kFlowControlMessageType];
       msg_type_ = ncs_decode_8bit(&ptr);
     }
   } else {
     if (mfrag_ != 0) {
-      ptr = &msg[8];
+      ptr = &msg[HeaderMessage::FieldIndex::kFlowControlSequenceNumber];
       fseq_ = ncs_decode_16bit(&ptr);
       if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
     }
@@ -90,7 +90,7 @@ void DataMessage::Decode(uint8_t *msg) {
              MDS_HEADER_RCVR_SVC_ID_POSITION];
   svc_id_ = ncs_decode_16bit(&ptr);
   // decode snd_type
-  ptr = &msg[17];
+  ptr = &msg[DataMessage::FieldIndex::kSendType];
   snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
 }

@@ -109,19 +109,19 @@ ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size):
 void ChunkAck::Encode(uint8_t *msg) {
   uint8_t *ptr;
   // encode protocol identifier
-  ptr = &msg[11];
+  ptr = &msg[ChunkAck::FieldIndex::kProtocolIdentifier];
   ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
   // encode message type
-  ptr = &msg[15];
+  ptr = &msg[ChunkAck::FieldIndex::kFlowControlMessageType];
   ncs_encode_8bit(&ptr, kChunkAckMsgType);
   // encode service id
-  ptr = &msg[16];
+  ptr = &msg[ChunkAck::FieldIndex::kServiceId];
   ncs_encode_16bit(&ptr, svc_id_);
   // encode flow control sequence number
-  ptr = &msg[18];
+  ptr = &msg[ChunkAck::FieldIndex::kFlowControlSequenceNumber];
   ncs_encode_16bit(&ptr, acked_fseq_);
   // encode chunk size
-  ptr = &msg[20];
+  ptr = &msg[ChunkAck::FieldIndex::kChunkAckSize];
   ncs_encode_16bit(&ptr, chunk_size_);
 }

@@ -129,13 +129,13 @@ void ChunkAck::Decode(uint8_t *msg) {
   uint8_t *ptr;

   // decode service id
-  ptr = &msg[16];
+  ptr = &msg[ChunkAck::FieldIndex::kServiceId];
   svc_id_ = ncs_decode_16bit(&ptr);
   // decode flow control sequence number
-  ptr = &msg[18];
+  ptr = &msg[ChunkAck::FieldIndex::kFlowControlSequenceNumber];
   acked_fseq_ = ncs_decode_16bit(&ptr);
   // decode chunk size
-  ptr = &msg[20];
+  ptr = &msg[ChunkAck::FieldIndex::kChunkAckSize];
   chunk_size_ = ncs_decode_16bit(&ptr);
 }

diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index e6b9662..d67ed19 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -71,8 +71,8 @@ class Event {
     fseq_(f_seg_num), chunk_size_(chunk_size) {
     type_ = type;
   }
-  bool IsTimerEvent() { return (type_ > Type::kEvtTmrAll); }
-  bool IsFlowEvent() {
+  bool IsTimerEvent() const { return (type_ > Type::kEvtTmrAll); }
+  bool IsFlowEvent() const {
     return (Type::kEvtDataFlowAll < type_ && type_ < Type::kEvtTmrAll);
   }
 };
@@ -86,6 +86,14 @@ class BaseMessage {

 class HeaderMessage: public BaseMessage {
  public:
+  enum FieldIndex {
+    kMessageLength = 0,
+    kSequenceNumber = 2,
+    kFragmentNumber = 6,
+    kLengthCheck = 8,
+    kFlowControlSequenceNumber = kLengthCheck,  // reuse kLengthCheck
+    kProtocolVersion = 10
+  };
   uint8_t* msg_ptr_{nullptr};
   uint16_t msg_len_{0};
   uint32_t mseq_{0};
@@ -104,6 +112,9 @@ class HeaderMessage: public BaseMessage {

 class DataMessage: public BaseMessage {
  public:
+  enum FieldIndex {
+    kSendType = 17,
+  };
   HeaderMessage header_;
   uint16_t svc_id_{0};

@@ -118,6 +129,13 @@ class DataMessage: public BaseMessage {

 class ChunkAck: public BaseMessage {
  public:
+  enum FieldIndex {
+    kProtocolIdentifier = 11,
+    kFlowControlMessageType = 15,
+    kServiceId = 16,
+    kFlowControlSequenceNumber = 18,
+    kChunkAckSize = 20
+  };
   static const uint8_t kChunkAckMsgType = 1;
   static const uint16_t kChunkAckMsgLength = 22;

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 365d72f..1ce792d 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -30,6 +30,7 @@ Timer::Timer(Event::Type type) {
 }

 Timer::~Timer() {
+  // do not call Stop
 }

 void Timer::Start(int64_t period, void (*tmr_exp_func)(void*)) {
@@ -57,8 +58,8 @@ void MessageQueue::Queue(DataMessage* msg) {
 }

 DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
-  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
-    DataMessage *m = *it;
+  for (const auto& it : queue_) {
+    DataMessage *m = it;
     if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
       return m;
     }
@@ -83,8 +84,8 @@ uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
 }

 DataMessage* MessageQueue::FirstUnsent() {
-  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
-    DataMessage *m = *it;
+  for (const auto& it : queue_) {
+    DataMessage *m = it;
     if (m->is_sent_ == false) {
       return m;
     }
@@ -93,8 +94,8 @@ DataMessage* MessageQueue::FirstUnsent() {
 }

 void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
-  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
-    DataMessage *m = *it;
+  for (const auto& it : queue_) {
+    DataMessage *m = it;
     if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
   }
 }
@@ -118,7 +119,6 @@ TipcPortId::~TipcPortId() {
   ReceiveTmrChunkAck();
   // flush all unsent msg in sndqueue_
   FlushData();
-  sndqueue_.Clear();
 }

 uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
@@ -143,6 +143,7 @@ void TipcPortId::FlushData() {
           sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
     }
   } while (msg != nullptr);
+  sndqueue_.Clear();
 }

 uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
@@ -203,10 +204,20 @@ bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
   if (sndwnd_.nacked_space_ + sending_len < rcv_buf_size_) {
     return true;
   } else {
-    state_ = State::kRcvBuffOverflow;
-    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
-        ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
-        sending_len, rcv_buf_size_);
+    if (state_ == State::kTxProb) {
+      // Too many msgs are not acked by receiver while in txprob state
+      // disable flow control
+      state_ = State::kDisabled;
+      m_MDS_LOG_ERR("FCTRL: [node:%x, ref:%u] --> Disabled, %" PRIu64
+          ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+          sending_len, rcv_buf_size_);
+      return true;
+    } else if (state_ == State::kEnabled) {
+      state_ = State::kRcvBuffOverflow;
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
+          ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+          sending_len, rcv_buf_size_);
+    }
     return false;
   }
 }
@@ -242,7 +253,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
   // update state
   if (state_ == State::kTxProb || state_ == State::kStartup) {
     state_ = State::kEnabled;
-    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
         "RcvData, TxProb[retries:%u, state:%u]",
         id_.node, id_.ref,
         txprob_cnt_, (uint8_t)state_);
@@ -331,7 +342,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
   // update state
   if (state_ == State::kTxProb) {
     state_ = State::kEnabled;
-    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
         "RcvChkAck, "
         "TxProb[retries:%u, state:%u]",
         id_.node, id_.ref,
@@ -358,7 +369,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
     sndwnd_.nacked_space_ -= acked_bytes;

     // try to send a few pending msg
-    DataMessage* msg;
+    DataMessage* msg = nullptr;
     uint64_t resend_bytes = 0;
     while (resend_bytes < acked_bytes) {
       // find the lowest sequence unsent yet
@@ -386,7 +397,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
     // no more unsent message, back to kEnabled
     if (msg == nullptr && state_ == State::kRcvBuffOverflow) {
       state_ = State::kEnabled;
-      m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
+      m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
           id_.node, id_.ref);
     }
   } else {
@@ -423,7 +434,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
   }
   if (state_ != State::kRcvBuffOverflow) {
     state_ = State::kRcvBuffOverflow;
-    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
+    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u] --> Overflow ",
         id_.node, id_.ref);
     sndqueue_.MarkUnsentFrom(Seq16(fseq));
   }
@@ -466,10 +477,9 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
     // receiver is at old mds version
     if (state_ == State::kDisabled) {
       FlushData();
-      sndqueue_.Clear();
     }

-    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
+    m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
         "TxProbExp, TxProb[retries:%u, state:%u]",
         id_.node, id_.ref,
         txprob_cnt_, (uint8_t)state_);
--
2.7.4

On 14/8/19 4:38 pm, Minh Chau wrote:
Summary: mds: Add solution for TIPC buffer overflow at MDS [#1960]
Review request for Ticket(s): 1960
Peer Reviewer(s): Anders, HansN, Lennart, Gary, Vu, Thuan
Pull request to: *** LIST THE PERSON WITH PUSH ACCESS HERE ***
Affected branch(es): develop
Development branch: ticket-1960
Base revision: 2d85d5d9264c6a7d1c6601b900fb810facbee3ac
Personal repository: git://git.code.sf.net/u/minh-chau/review

--------------------------------
Impacted area       Impact y/n
--------------------------------
  Docs                    n
  Build system            n
  RPM/packaging           n
  Configuration files     n
  Startup scripts         n
  SAF services            y
  OpenSAF services        n
  Core libraries          y
  Samples                 n
  Tests                   n
  Other                   n

NOTE: Patch(es) contain lines longer than 80 characers

Comments (indicate scope for each "y" above):
---------------------------------------------
Sending on behalf of Thuan & Minh.
Some pending tasks to accomplish

. Handle broadcast/multicast mds message with flow control.
. Reduce the memory re-allocation overhead if enables flow control.
(At this moment, memory is allocated at mds_dt_tip.c and cloned to buffer
for retransmission queue again).
. The sequence number arithmetic (sna) should be implemented in /base code.
. Adding mdstest to cover sna wrapped-round
. MDS_CHECKSUM_ENABLE_FLAG

revision d208cff344c35afd25ac01ab4f9057d153fe9495
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:22:12 +1000

mds: Add TIPC buffer overflow for mdstest [#1960]

(Sending on behalf of Thuan)



revision d04c3e99744b86278c6a54d8ec1e4caabfbcabd2
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:22:12 +1000

mds: Apply serial number arithmetic for sequence counter [#1960]

This patch applies the serial number arithmetic for the flow control
sequence number, referenced to RFC1982.

This is only temporary patch, a proper one could be made in /base
with template for others type, e.g uint32. Then mds reuses it from
/base.



revision 7bbaf4eed324ba0575f4893a37eb10c9b7df4426
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:22:12 +1000

mds: Add configurable parameters [#1960]

This patch makes the solution of TIPC buffer overflow configurable,
as well as the ack timeout/ack size.
For example:
The service config file can export the following environment variables

export MDS_TIPC_FCTRL_ENABLED=1
export MDS_TIPC_FCTRL_ACKTIMEOUT=1000
export MDS_TIPC_FCTRL_ACKSIZE=1

If MDS_TIPC_FCTRL_ACKTIMEOUT, MDS_TIPC_FCTRL_ACKSIZE are not specified,
the default values are used.



revision 842dcecbdbafb3744a99ead42899f955eb79f94f
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:22:12 +1000

mds: Implement kRcvBuffOverflow state [#1960]

This patch implements the kRcvBuffOverflow state machine as
described in README file.



revision ed03d11a194f9775fc43003acc35924ef28227b7
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:22:12 +1000

mds: Add state machine for tipc portid instance [#1960]

This patch adds state machine to support tx probation timer.



revision 759f7ef7b55a8f27118e82c757c21b2f452c8b7c
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:22:12 +1000

mds: Add timeout for ack message [#1960]

If the ack size is configured greater than 1, there should be a timeout
at receiver ends to send the ack message back to senders.
The ack message timeout utilizes the poll timeout in flow control thread
to make mds lightweight (in contrast to additional timer threads).



revision 7c115e411f9de31b2ffe3c32ed401d7b8a3de696
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:21:33 +1000

mds: Add implementation for TIPC buffer overflow solution [#1960]

This is a collaborative patch of two participants:Thuan, Minh.

Main changes:
- Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files
introduce new functions which are called in mds_dt_tipc.c if the flow
control is enabled
- Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files
implements the tipc portid instance, which supports the sliding window,
mds msg queue
- Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define
the event and messages which are used for this solution.



revision 4096f66de92f89ef30a54a137465384b74143800
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:12:19 +1000

mds: Resolve c/c++ linking issue [#1960]

(Sending on behalf of Thuan)
This patch solves the linking issue if mds_dt.h or mds_core.h
is included in c++ sources.



revision 04152d708f30abf813dad1c18d5ed3d73df4ef3d
Author: Minh Chau <minh.c...@dektech.com.au>
Date:   Wed, 14 Aug 2019 16:11:17 +1000

mds: Add README for solution of TIPC buffer overflow at MDS [#1960]



Added Files:
------------
  src/mds/mds_tipc_fctrl_intf.cc
  src/mds/mds_tipc_fctrl_intf.h
  src/mds/mds_tipc_fctrl_msg.cc
  src/mds/mds_tipc_fctrl_msg.h
  src/mds/mds_tipc_fctrl_portid.cc
  src/mds/mds_tipc_fctrl_portid.h
  src/mds/README


Complete diffstat:
------------------
  src/mds/Makefile.am              |  10 +-
  src/mds/README                   | 221 ++++++++++++++++++
  src/mds/apitest/mdstest.c        |   5 +-
  src/mds/apitest/mdstipc.h        |   6 +-
  src/mds/apitest/mdstipc_api.c    | 237 +++++++++++++++++++
  src/mds/apitest/mdstipc_conf.c   |  19 +-
  src/mds/mds_core.h               |  74 ++++++
  src/mds/mds_dt.h                 |  12 +-
  src/mds/mds_dt2c.h               |  67 ------
  src/mds/mds_dt_tcp.c             |   2 +
  src/mds/mds_dt_tcp.h             |   1 -
  src/mds/mds_dt_tipc.c            | 203 +++++++++++-----
  src/mds/mds_tipc_fctrl_intf.cc   | 467 ++++++++++++++++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_intf.h    |  48 ++++
  src/mds/mds_tipc_fctrl_msg.cc    | 142 +++++++++++
  src/mds/mds_tipc_fctrl_msg.h     | 137 +++++++++++
  src/mds/mds_tipc_fctrl_portid.cc | 493 +++++++++++++++++++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_portid.h  | 176 ++++++++++++++
  18 files changed, 2172 insertions(+), 148 deletions(-)


Testing Commands:
-----------------
all service's legacy apitest testcases
mdstest 27


Testing, Expected Results:
--------------------------
all pass


Conditions of Submission:
-------------------------
*** HOW MANY DAYS BEFORE PUSHING, CONSENSUS ETC ***


Arch      Built     Started    Linux distro
-------------------------------------------
mips        n          n
mips64      n          n
x86         n          n
x86_64      n          n
powerpc     n          n
powerpc64   n          n


Reviewer Checklist:
-------------------
[Submitters: make sure that your review doesn't trigger any checkmarks!]


Your checkin has not passed review because (see checked entries):

___ Your RR template is generally incomplete; it has too many blank entries
     that need proper data filled in.

___ You have failed to nominate the proper persons for review and push.

___ Your patches do not have proper short+long header

___ You have grammar/spelling in your header that is unacceptable.

___ You have exceeded a sensible line length in your headers/comments/text.

___ You have failed to put in a proper Trac Ticket # into your commits.

___ You have incorrectly put/left internal data in your comments/files
     (i.e. internal bug tracking tool IDs, product names etc)

___ You have not given any evidence of testing beyond basic build tests.
     Demonstrate some level of runtime or other sanity testing.

___ You have ^M present in some of your files. These have to be removed.

___ You have needlessly changed whitespace or added whitespace crimes
     like trailing spaces, or spaces before tabs.

___ You have mixed real technical changes with whitespace and other
     cosmetic code cleanup changes. These have to be separate commits.

___ You need to refactor your submission into logical chunks; there is
     too much content into a single commit.

___ You have extraneous garbage in your review (merge commits etc)

___ You have giant attachments which should never have been sent;
     Instead you should place your content in a public tree to be pulled.

___ You have too many commits attached to an e-mail; resend as threaded
     commits, or place in a public tree for a pull.

___ You have resent this content multiple times without a clear indication
     of what has changed between each re-send.

___ You have failed to adequately and individually address all of the
     comments and change requests that were proposed in the initial review.

___ You have a misconfigured ~/.gitconfig file (i.e. user.name, user.email etc)

___ Your computer have a badly configured date and time; confusing the
     the threaded patch review.

___ Your changes affect IPC mechanism, and you don't present any results
     for in-service upgradability test.

___ Your changes affect user manual and documentation, your patch series
     do not contain the patch that updates the Doxygen manual.





_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to