- Store and check last received fseq to not mistakenly decide
PortId reset scenario.
- Check to skip invalid Nack to avoid sender mistake move to
overflow and queue all messages later but receiver don't get
any further message to send ChunkAck.
- Update tet_receiver() to poll without timeout as sender may
take long time for sendto() return due to run out of memory.
- Update tet_sender() to slow down sending if amount of message
is big and message size is big to avoid kernel kill it as memory
usage too much.
- Not return error if PortId not found in checking send queue
capable to avoid agent crash after fix #3208 if agent enable mds
flow control.
---
 src/mds/apitest/mdstipc_api.c    | 15 +++++----------
 src/mds/mds_tipc_fctrl_intf.cc   | 13 ++-----------
 src/mds/mds_tipc_fctrl_portid.cc | 17 +++++++++++++++--
 src/mds/mds_tipc_fctrl_portid.h  |  1 +
 4 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/src/mds/apitest/mdstipc_api.c b/src/mds/apitest/mdstipc_api.c
index 3dd1a3dc0..ca5674b1b 100644
--- a/src/mds/apitest/mdstipc_api.c
+++ b/src/mds/apitest/mdstipc_api.c
@@ -13413,6 +13413,8 @@ void tet_sender(MDS_SVC_ID svc_id, uint32_t msg_num, 
uint32_t msg_size,
                                        " successfully\n", i);
                        }
                }
+               if (msg_num > 65535 && msg_size > 10000)
+                       usleep(100); // Slow down to avoid reaped by OOM killer
        }
        free(mesg);
        while (1) {
@@ -13459,7 +13461,7 @@ int tet_receiver(MDS_SVC_ID svc_id, uint32_t msg_num,
        sel.fd = m_GET_FD_FROM_SEL_OBJ(gl_tet_adest.svc[0].sel_obj);
        sel.events = POLLIN;
        while (1) {
-               int ret = osaf_poll(&sel, 1, 10000);
+               int ret = osaf_poll(&sel, 1, 1000);
                if (ret > 0) {
                        gl_rcvdmsginfo.msg = NULL;
                        if (mds_service_retrieve(gl_tet_adest.mds_pwe1_hdl,
@@ -13486,19 +13488,12 @@ int tet_receiver(MDS_SVC_ID svc_id, uint32_t msg_num,
                                }
                                free(msg);
                        }
-               } else {
+               } else if (verify_counters(msg_num)) {
+                       printf("\nReceiver: get enough %d messages\n", msg_num);
                        break;
                }
        }
 
-       printf("\nReceiver verify number of received messages\n");
-       if (!verify_counters(msg_num)) {
-               printf("\nReceiver: Not get enough %d messages\n", msg_num);
-               free(expected_buff);
-               reset_counters();
-               return 1;
-       }
-
        printf("\nEnd Receiver (pid:%d) svc_id=%d\n",
                        (int)getpid(), svc_id);
        free(expected_buff);
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 93bfce51c..34069a262 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -351,26 +351,17 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct 
tipc_portid id,
           uint16_t* next_seq) {
   if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;
 
-  uint32_t rc = NCSCC_RC_SUCCESS;
-
   portid_map_mutex.lock();
 
   TipcPortId *portid = portid_lookup(id);
-  if (portid == nullptr) {
-    m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
-        "[line:%u], Error[PortId not found]",
-        id.node, id.ref, __LINE__);
-    rc = NCSCC_RC_FAILURE;
-  } else {
-    if (portid->state_ != TipcPortId::State::kDisabled) {
+  if (portid && portid->state_ != TipcPortId::State::kDisabled) {
       // assign the sequence number of the outgoing message
       *next_seq = portid->GetCurrentSeq();
-    }
   }
 
   portid_map_mutex.unlock();
 
-  return rc;
+  return NCSCC_RC_SUCCESS;
 }
 
 uint32_t mds_tipc_fctrl_trysend(struct tipc_portid id, const uint8_t *buffer,
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 41fce3df8..06c8d18d1 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -371,7 +371,8 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 
     // check for transmission error
     if (rcvwnd_.rcv_ + 1 < Seq16(fseq)) {
-      if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
+      if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0 &&
+          last_rcv_fseq_ != Seq16(0)) {
         // peer does not realize that this portid reset
         m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
             "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
@@ -397,7 +398,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
         // send nack
         SendNack((rcvwnd_.rcv_ + 1).v(), svc_id);
       }
-    } else if (fseq == 1) {
+    } else if (fseq == 1 && last_rcv_fseq_ != Seq16(0)) {
       // sender realize me as portid reset
       m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
           "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
@@ -422,6 +423,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
           rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
     }
   }
+  last_rcv_fseq_ = Seq16(fseq);
   return rc;
 }
 
@@ -509,6 +511,17 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
         fseq);
     return;
   }
+
+  if (Seq16(fseq) <= sndwnd_.acked_) {
+    m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvNack[fseq:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+        "Warning[Invalid Nack]",
+        id_.node, id_.ref, fseq,
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+    return;
+  }
+
   if (state_ == State::kRcvBuffOverflow) {
     sndqueue_.MarkUnsentFrom(Seq16(fseq));
     if (Seq16(fseq) - sndwnd_.acked_ > 1) {
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 83564459b..5479eead3 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -97,6 +97,7 @@ class TipcPortId {
   uint64_t rcv_buf_size_{0};  // estimated buffer size at receiver
   bool rcving_mbcast_{false};  // a flag of receiving the bcast/mcast msg
   bool tmr_trigger_send_{false};  // a flag to use timer trigger send messages
+  Seq16 last_rcv_fseq_{1};
 
   struct sndwnd {
     // sender sequence window
-- 
2.17.1



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to