This patch implements the kRcvBuffOverflow state machine as
described in README file.
---
 src/mds/mds_tipc_fctrl_intf.cc   |   6 +-
 src/mds/mds_tipc_fctrl_msg.h     |   1 +
 src/mds/mds_tipc_fctrl_portid.cc | 137 ++++++++++++++++++++++++++++++++++-----
 src/mds/mds_tipc_fctrl_portid.h  |   5 +-
 4 files changed, 131 insertions(+), 18 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index c2d0922..397114e 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -285,14 +285,16 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, 
uint16_t len,
     rc = NCSCC_RC_FAILURE;
   } else {
     if (portid->state_ != TipcPortId::State::kDisabled) {
-      portid->Queue(buffer, len);
+      bool sendable = portid->ReceiveCapable(len);
+      portid->Queue(buffer, len, sendable);
       // start txprob timer for the first msg sent out
       // do not start for other states
-      if (portid->state_ == TipcPortId::State::kStartup) {
+      if (sendable && portid->state_ == TipcPortId::State::kStartup) {
         txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
         m_MDS_LOG_DBG("FCTRL: Start txprob");
         portid->state_ = TipcPortId::State::kTxProb;
       }
+      if (sendable == false) rc = NCSCC_RC_FAILURE;
     }
   }
 
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 69f8048..e6b9662 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -110,6 +110,7 @@ class DataMessage: public BaseMessage {
   uint8_t* msg_data_{nullptr};
   uint8_t snd_type_{0};
 
+  bool is_sent_{true};
   DataMessage() {}
   virtual ~DataMessage();
   void Decode(uint8_t *msg) override;
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 84ecee9..e762290 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -82,6 +82,23 @@ uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t 
fseq_to) {
   return msg_len;
 }
 
+DataMessage* MessageQueue::FirstUnsent() {
+  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
+    DataMessage *m = *it;
+    if (m->is_sent_ == false) {
+      return m;
+    }
+  }
+  return nullptr;
+}
+
+void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
+    DataMessage *m = *it;
+    if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+  }
+}
+
 void MessageQueue::Clear() {
   while (queue_.empty() == false) {
     DataMessage* msg = queue_.front();
@@ -99,7 +116,8 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, 
uint16_t chksize,
 TipcPortId::~TipcPortId() {
   // Fake a TmrChunkAck event to ack all received messages
   ReceiveTmrChunkAck();
-  // clear all msg in sndqueue_
+  // flush all unsent msg in sndqueue_
+  FlushData();
   sndqueue_.Clear();
 }
 
@@ -109,6 +127,24 @@ uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
   return uid;
 }
 
+void TipcPortId::FlushData() {
+  DataMessage* msg = nullptr;
+  do {
+    // find the lowest sequence unsent yet
+    msg = sndqueue_.FirstUnsent();
+    if (msg != nullptr) {
+      Send(msg->msg_data_, msg->header_.msg_len_);
+      msg->is_sent_ = true;
+      m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+          "FlushData[mseq:%u, mfrag:%u, fseq:%u], "
+          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+          id_.node, id_.ref,
+          msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
+          sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+    }
+  } while (msg != nullptr);
+}
+
 uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
   struct sockaddr_tipc server_addr;
   ssize_t send_len = 0;
@@ -130,29 +166,49 @@ uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) 
{
   return rc;
 }
 
-uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
+uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
+    bool is_sent) {
   uint32_t rc = NCSCC_RC_SUCCESS;
 
   DataMessage *msg = new DataMessage;
   msg->header_.Decode(const_cast<uint8_t*>(data));
   msg->Decode(const_cast<uint8_t*>(data));
   msg->msg_data_ = new uint8_t[length];
+  msg->is_sent_ = is_sent;
   memcpy(msg->msg_data_, data, length);
   sndqueue_.Queue(msg);
-  ++sndwnd_.send_;
-  sndwnd_.nacked_space_ += length;
-  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
-      "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
-      "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
-      id_.node, id_.ref,
-      msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-      sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
-
+  if (is_sent) {
+    ++sndwnd_.send_;
+    sndwnd_.nacked_space_ += length;
+    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+        "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  } else {
+    ++sndwnd_.send_;
+    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+        "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  }
   return rc;
 }
 
 bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
-  return true;
+  if (state_ == State::kRcvBuffOverflow) return false;
+  if (sndwnd_.nacked_space_ + sending_len < rcv_buf_size_) {
+    return true;
+  } else {
+    state_ = State::kRcvBuffOverflow;
+    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
+        ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+        sending_len, rcv_buf_size_);
+    return false;
+  }
 }
 
 void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
@@ -297,7 +353,41 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 
     // remove a number @chksize messages out of sndqueue_ and decrease
     // the nacked_space_ of sender
-    sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq);
+    uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq);
+    sndwnd_.nacked_space_ -= acked_bytes;
+
+    // try to send a few pending msg
+    DataMessage* msg;
+    uint64_t resend_bytes = 0;
+    while (resend_bytes < acked_bytes) {
+      // find the lowest sequence unsent yet
+      msg = sndqueue_.FirstUnsent();
+      if (msg == nullptr) {
+        break;
+      } else {
+        if (resend_bytes < acked_bytes) {
+          if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) 
{
+            sndwnd_.nacked_space_ += msg->header_.msg_len_;
+            msg->is_sent_ = true;
+            resend_bytes += msg->header_.msg_len_;
+            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+                "SndQData[fseq:%u, len:%u], "
+                "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+                id_.node, id_.ref,
+                msg->header_.fseq_, msg->header_.msg_len_,
+                sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+          }
+        } else {
+          break;
+        }
+      }
+    }
+    // no more unsent message, back to kEnabled
+    if (msg == nullptr && state_ == State::kRcvBuffOverflow) {
+      state_ = State::kEnabled;
+      m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
+          id_.node, id_.ref);
+    }
   } else {
     m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
         "RcvChkAck[fseq:%u, chunk:%u], "
@@ -322,10 +412,26 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t 
mfrag,
         fseq);
     return;
   }
+  if (state_ == State::kRcvBuffOverflow) {
+    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvNack,  ignore[fseq:%u, state:%u]",
+        id_.node, id_.ref,
+        fseq, (uint8_t)state_);
+    sndqueue_.MarkUnsentFrom(fseq);
+    return;
+  }
+  if (state_ != State::kRcvBuffOverflow) {
+    state_ = State::kRcvBuffOverflow;
+    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
+        id_.node, id_.ref);
+    sndqueue_.MarkUnsentFrom(fseq);
+  }
   DataMessage* msg = sndqueue_.Find(mseq, mfrag);
   if (msg != nullptr) {
     // Resend the msg found
-    Send(msg->msg_data_, msg->header_.msg_len_);
+    if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
+      msg->is_sent_ = true;
+    }
     m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
         "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
         "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
@@ -346,7 +452,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
   if (state_ == State::kDisabled ||
       sndwnd_.acked_ > 1 ||
       rcvwnd_.rcv_ > 1) return restart_txprob;
-  if (state_ == State::kTxProb) {
+  if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) {
     txprob_cnt_++;
     if (txprob_cnt_ >= max_txprob) {
       state_ = State::kDisabled;
@@ -358,6 +464,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
     // at kDisabled state, clear all message in sndqueue_,
     // receiver is at old mds version
     if (state_ == State::kDisabled) {
+      FlushData();
       sndqueue_.Clear();
     }
 
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 0adb9dd..119c012 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -35,6 +35,8 @@ class MessageQueue {
   uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
   uint64_t Size() const { return queue_.size(); }
   void Clear();
+  DataMessage* FirstUnsent();
+  void MarkUnsentFrom(uint16_t fseq);
  private:
   std::deque<DataMessage*> queue_;
 };
@@ -73,8 +75,9 @@ class TipcPortId {
   void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
   bool ReceiveTmrTxProb(uint8_t max_txprob);
   void ReceiveTmrChunkAck();
+  void FlushData();
   uint32_t Send(uint8_t* data, uint16_t length);
-  uint32_t Queue(const uint8_t* data, uint16_t length);
+  uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);
 
   uint16_t svc_cnt_{1};  // number of service subscribed on this portid
 
-- 
2.7.4



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to