Hi Vu,

Agree with your comments. Any comments for patches 8/9 and 9/9?

thanks

Minh

On 16/9/19 5:22 pm, Nguyen Minh Vu wrote:
Hi Minh,

I has few comments below.

Regards, Vu

On 8/14/19 1:38 PM, Minh Chau wrote:
This patch implements the kRcvBuffOverflow state machine as
described in README file.
---
  src/mds/mds_tipc_fctrl_intf.cc   |   6 +-
  src/mds/mds_tipc_fctrl_msg.h     |   1 +
  src/mds/mds_tipc_fctrl_portid.cc | 137 ++++++++++++++++++++++++++++++++++-----
  src/mds/mds_tipc_fctrl_portid.h  |   5 +-
  4 files changed, 131 insertions(+), 18 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index c2d0922..397114e 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -285,14 +285,16 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
      rc = NCSCC_RC_FAILURE;
    } else {
      if (portid->state_ != TipcPortId::State::kDisabled) {
-      portid->Queue(buffer, len);
+      bool sendable = portid->ReceiveCapable(len);
+      portid->Queue(buffer, len, sendable);
        // start txprob timer for the first msg sent out
        // do not start for other states
-      if (portid->state_ == TipcPortId::State::kStartup) {
+      if (sendable && portid->state_ == TipcPortId::State::kStartup) {
          txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
          m_MDS_LOG_DBG("FCTRL: Start txprob");
          portid->state_ = TipcPortId::State::kTxProb;
        }
+      if (sendable == false) rc = NCSCC_RC_FAILURE;
      }
    }
  diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 69f8048..e6b9662 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -110,6 +110,7 @@ class DataMessage: public BaseMessage {
    uint8_t* msg_data_{nullptr};
    uint8_t snd_type_{0};
  +  bool is_sent_{true};
    DataMessage() {}
    virtual ~DataMessage();
    void Decode(uint8_t *msg) override;
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 84ecee9..e762290 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -82,6 +82,23 @@ uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
    return msg_len;
  }
  +DataMessage* MessageQueue::FirstUnsent() {
+  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
[Vu] Use the shorter version `for (const auto& it : queue_)
+    DataMessage *m = *it;
+    if (m->is_sent_ == false) {
+      return m;
+    }
+  }
+  return nullptr;
+}
+
+void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
[Vu] as above comment
+    DataMessage *m = *it;
+    if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+  }
+}
+
  void MessageQueue::Clear() {
    while (queue_.empty() == false) {
      DataMessage* msg = queue_.front();
@@ -99,7 +116,8 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize,
  TipcPortId::~TipcPortId() {
    // Fake a TmrChunkAck event to ack all received messages
    ReceiveTmrChunkAck();
-  // clear all msg in sndqueue_
+  // flush all unsent msg in sndqueue_
+  FlushData();
    sndqueue_.Clear();
[Vu] If sndqueue_.Clear() must be called every time calling `FlushData`, should move `Clear()` into FlushData() ?
  }
  @@ -109,6 +127,24 @@ uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
    return uid;
  }
  +void TipcPortId::FlushData() {
+  DataMessage* msg = nullptr;
+  do {
+    // find the lowest sequence unsent yet
+    msg = sndqueue_.FirstUnsent();
+    if (msg != nullptr) {
+      Send(msg->msg_data_, msg->header_.msg_len_);
+      msg->is_sent_ = true;
+      m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+          "FlushData[mseq:%u, mfrag:%u, fseq:%u], "
+          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+          id_.node, id_.ref,
+          msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
+          sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+    }
+  } while (msg != nullptr);
+}
+
  uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
    struct sockaddr_tipc server_addr;
    ssize_t send_len = 0;
@@ -130,29 +166,49 @@ uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
    return rc;
  }
  -uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
+uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
+    bool is_sent) {
    uint32_t rc = NCSCC_RC_SUCCESS;
      DataMessage *msg = new DataMessage;
    msg->header_.Decode(const_cast<uint8_t*>(data));
    msg->Decode(const_cast<uint8_t*>(data));
    msg->msg_data_ = new uint8_t[length];
+  msg->is_sent_ = is_sent;
    memcpy(msg->msg_data_, data, length);
    sndqueue_.Queue(msg);
-  ++sndwnd_.send_;
-  sndwnd_.nacked_space_ += length;
-  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
-      "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
-      "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
-      id_.node, id_.ref,
-      msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-      sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
-
+  if (is_sent) {
+    ++sndwnd_.send_;
+    sndwnd_.nacked_space_ += length;
+    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+        "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  } else {
+    ++sndwnd_.send_;
+    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+        "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  }
    return rc;
  }
    bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
-  return true;
+  if (state_ == State::kRcvBuffOverflow) return false;
+  if (sndwnd_.nacked_space_ + sending_len < rcv_buf_size_) {
+    return true;
+  } else {
+    state_ = State::kRcvBuffOverflow;
+    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
+        ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+        sending_len, rcv_buf_size_);
+    return false;
+  }
  }
    void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
@@ -297,7 +353,41 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {         // remove a number @chksize messages out of sndqueue_ and decrease
      // the nacked_space_ of sender
-    sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq);
+    uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq);
+    sndwnd_.nacked_space_ -= acked_bytes;
+
+    // try to send a few pending msg
+    DataMessage* msg;
[Vu] Should initialize `msg` to null here to avoid accessing uninitialized data in below `if` case.
+    uint64_t resend_bytes = 0;
+    while (resend_bytes < acked_bytes) {
+      // find the lowest sequence unsent yet
+      msg = sndqueue_.FirstUnsent();
+      if (msg == nullptr) {
+        break;
+      } else {
+        if (resend_bytes < acked_bytes) {
+          if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
+            sndwnd_.nacked_space_ += msg->header_.msg_len_;
+            msg->is_sent_ = true;
+            resend_bytes += msg->header_.msg_len_;
+            m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+                "SndQData[fseq:%u, len:%u], "
+                "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+                id_.node, id_.ref,
+                msg->header_.fseq_, msg->header_.msg_len_,
+                sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+          }
+        } else {
+          break;
+        }
+      }
+    }
+    // no more unsent message, back to kEnabled
+    if (msg == nullptr && state_ == State::kRcvBuffOverflow) {
+      state_ = State::kEnabled;
+      m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
+          id_.node, id_.ref);
+    }
    } else {
      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
          "RcvChkAck[fseq:%u, chunk:%u], "
@@ -322,10 +412,26 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
          fseq);
      return;
    }
+  if (state_ == State::kRcvBuffOverflow) {
+    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvNack,  ignore[fseq:%u, state:%u]",
+        id_.node, id_.ref,
+        fseq, (uint8_t)state_);
+    sndqueue_.MarkUnsentFrom(fseq);
+    return;
+  }
+  if (state_ != State::kRcvBuffOverflow) {
+    state_ = State::kRcvBuffOverflow;
+    m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
+        id_.node, id_.ref);
+    sndqueue_.MarkUnsentFrom(fseq);
+  }
    DataMessage* msg = sndqueue_.Find(mseq, mfrag);
    if (msg != nullptr) {
      // Resend the msg found
-    Send(msg->msg_data_, msg->header_.msg_len_);
+    if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
+      msg->is_sent_ = true;
+    }
      m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
          "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
@@ -346,7 +452,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
    if (state_ == State::kDisabled ||
        sndwnd_.acked_ > 1 ||
        rcvwnd_.rcv_ > 1) return restart_txprob;
-  if (state_ == State::kTxProb) {
+  if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) {
      txprob_cnt_++;
      if (txprob_cnt_ >= max_txprob) {
        state_ = State::kDisabled;
@@ -358,6 +464,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
      // at kDisabled state, clear all message in sndqueue_,
      // receiver is at old mds version
      if (state_ == State::kDisabled) {
+      FlushData();
        sndqueue_.Clear();
      }
  diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 0adb9dd..119c012 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -35,6 +35,8 @@ class MessageQueue {
    uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
    uint64_t Size() const { return queue_.size(); }
    void Clear();
+  DataMessage* FirstUnsent();
+  void MarkUnsentFrom(uint16_t fseq);
   private:
    std::deque<DataMessage*> queue_;
  };
@@ -73,8 +75,9 @@ class TipcPortId {
    void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
    bool ReceiveTmrTxProb(uint8_t max_txprob);
    void ReceiveTmrChunkAck();
+  void FlushData();
    uint32_t Send(uint8_t* data, uint16_t length);
-  uint32_t Queue(const uint8_t* data, uint16_t length);
+  uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);
      uint16_t svc_cnt_{1};  // number of service subscribed on this portid




_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to