Hi Vu,

Agree with your suggestion, it will be done in #3074.

Thanks

Minh

On 17/9/19 1:39 pm, Nguyen Minh Vu wrote:
Hi Minh,

I have a minor comment below.

Regards, Vu

On 8/14/19 1:38 PM, Minh Chau wrote:
This patch applies the serial number arithmetic for the flow control
sequence number, referenced to RFC1982.

This is only temporary patch, a proper one could be made in /base
with template for others type, e.g uint32. Then mds reuses it from
/base.
---
  src/mds/mds_tipc_fctrl_portid.cc | 53 +++++++++++++--------------
  src/mds/mds_tipc_fctrl_portid.h  | 77 ++++++++++++++++++++++++++++++++++++----
  2 files changed, 97 insertions(+), 33 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index e762290..365d72f 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
    return nullptr;
  }
  -uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
    uint64_t msg_len = 0;
    for (auto it = queue_.begin(); it != queue_.end();) {
      DataMessage *m = *it;
-    if (fseq_from <= m->header_.fseq_ &&
-        m->header_.fseq_ <= fseq_to) {
+    if (fseq_from <= Seq16(m->header_.fseq_) &&
+        Seq16(m->header_.fseq_) <= fseq_to) {
        msg_len += m->header_.msg_len_;
        it = queue_.erase(it);
        delete m;
@@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() {
    return nullptr;
  }
  -void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
    for (auto it = queue_.begin(); it != queue_.end(); ++it) {
      DataMessage *m = *it;
-    if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+    if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
    }
  }
  @@ -140,7 +140,7 @@ void TipcPortId::FlushData() {
            "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
            id_.node, id_.ref,
            msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
-          sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+          sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
      }
    } while (msg != nullptr);
  }
@@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
          msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
    } else {
      ++sndwnd_.send_;
      m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
@@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
          msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
    }
    return rc;
  }
@@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
          txprob_cnt_, (uint8_t)state_);
    }
    // update receiver sequence window
-  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+  if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) {
      m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
          "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
          "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
          mseq, mfrag, fseq,
-        rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+        rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
        ++rcvwnd_.rcv_;
      if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
@@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
      // It is not used for now, so ignore it.
        // check for transmission error
-    if (rcvwnd_.rcv_ + 1 < fseq) {
+    if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) {
        if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
          // peer does not realize that this portid reset
          m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
              "Warning[portid reset]",
              id_.node, id_.ref,
              mseq, mfrag, fseq,
-            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+            rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
            rcvwnd_.rcv_ = fseq;
        } else {
@@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
              "Error[msg loss]",
              id_.node, id_.ref,
              mseq, mfrag, fseq,
-            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+            rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
        }
      }
-    if (fseq <= rcvwnd_.acked_) {
+    if (Seq16(fseq) <= rcvwnd_.acked_) {
        rc = NCSCC_RC_FAILURE;
        // unexpected retransmission
        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -312,7 +312,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
            "Error[unexpected retransmission]",
            id_.node, id_.ref,
            mseq, mfrag, fseq,
-          rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+          rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
      }
    }
    return rc;
@@ -338,14 +338,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
          txprob_cnt_, (uint8_t)state_);
    }
    // update sender sequence window
-  if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
+  if (sndwnd_.acked_ < Seq16(fseq) && Seq16(fseq) < sndwnd_.send_) {
      m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
          "RcvChkAck[fseq:%u, chunk:%u], "
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
          "queue[size:%" PRIu64 "]",
          id_.node, id_.ref,
          fseq, chksize,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
          sndqueue_.Size());
        // fast forward the sndwnd_.acked_ sequence to fseq
@@ -353,7 +353,8 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {         // remove a number @chksize messages out of sndqueue_ and decrease
      // the nacked_space_ of sender
-    uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq);
+    uint64_t acked_bytes = sndqueue_.Erase(Seq16(fseq) - (chksize-1),
+        Seq16(fseq));
      sndwnd_.nacked_space_ -= acked_bytes;
        // try to send a few pending msg
@@ -375,7 +376,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
                  "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
                  id_.node, id_.ref,
                  msg->header_.fseq_, msg->header_.msg_len_,
-                sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+                sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
            }
          } else {
            break;
@@ -396,7 +397,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
          "Error[msg disordered]",
          id_.node, id_.ref,
          fseq, chksize,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
          sndqueue_.Size());
    }
  }
@@ -417,14 +418,14 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
          "RcvNack,  ignore[fseq:%u, state:%u]",
          id_.node, id_.ref,
          fseq, (uint8_t)state_);
-    sndqueue_.MarkUnsentFrom(fseq);
+    sndqueue_.MarkUnsentFrom(Seq16(fseq));
      return;
    }
    if (state_ != State::kRcvBuffOverflow) {
      state_ = State::kRcvBuffOverflow;
      m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
          id_.node, id_.ref);
-    sndqueue_.MarkUnsentFrom(fseq);
+    sndqueue_.MarkUnsentFrom(Seq16(fseq));
    }
    DataMessage* msg = sndqueue_.Find(mseq, mfrag);
    if (msg != nullptr) {
@@ -437,7 +438,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
          "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
          id_.node, id_.ref,
          mseq, mfrag, fseq,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
    } else {
      m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
          "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
@@ -450,8 +451,8 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
  bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
    bool restart_txprob = false;
    if (state_ == State::kDisabled ||
-      sndwnd_.acked_ > 1 ||
-      rcvwnd_.rcv_ > 1) return restart_txprob;
+      sndwnd_.acked_ > Seq16(1) ||
+      rcvwnd_.rcv_ > Seq16(1)) return restart_txprob;
    if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) {
      txprob_cnt_++;
      if (txprob_cnt_ >= max_txprob) {
@@ -484,7 +485,7 @@ void TipcPortId::ReceiveTmrChunkAck() {
          "ChkAckExp",
          id_.node, id_.ref);
      // send ack for @chksize msgs starting from rcvwnd_.rcv_
-    SendChunkAck(rcvwnd_.rcv_, 0, chksize);
+    SendChunkAck(rcvwnd_.rcv_.v(), 0, chksize);
      rcvwnd_.acked_ = rcvwnd_.rcv_;
    }
  }
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 119c012..cf2daaa 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -28,15 +28,78 @@
    namespace mds {
  +class Seq16 {
+ public:
+#define SEQ16_MAX 65536
+#define SEQ16_SPACE 32768
+  uint16_t value_;
+  explicit Seq16(uint16_t v) {
+    value_ = uint16_t((uint32_t)v % SEQ16_MAX);
+  }
+  uint16_t v() {
+    return value_;
+  }
+  Seq16 operator + (const Seq16 add) const {
+    return Seq16(((uint32_t)value_ + (uint32_t)add.value_) % SEQ16_MAX);
+  }
+
+  int16_t operator - (const Seq16 sub) const {
+    if (value_ < sub.value_ && (sub.value_ - value_ < SEQ16_SPACE)) {
+      return value_ - sub.value_;
+    }
+    if (value_ > sub.value_ && (value_ - sub.value_ > SEQ16_SPACE)) {
+      return (int32_t)value_ + SEQ16_MAX - (int32_t)sub.value_;
+    }
+    if (value_ < sub.value_ && (sub.value_ - value_ > SEQ16_SPACE)) {
+      return (int32_t)value_ + SEQ16_MAX - (int32_t)sub.value_;
+    }
+    if (value_ > sub.value_ && (value_ - sub.value_ < SEQ16_SPACE)) {
+      return value_ - sub.value_;
+    }
+    return 0;
+  }
+  Seq16 operator - (const uint16_t sub) const {
+    return Seq16(((uint32_t)value_ + 65536 - sub) % SEQ16_MAX);
+  }
+  void operator ++() {
+    value_ = (value_ + 1) % SEQ16_MAX;
+  }
+  void operator = (const uint16_t v) {
+    value_ = v % SEQ16_MAX;
+  }
+  bool operator == (const Seq16& seq) const {
+    return value_ == seq.value_;
+  }
+  bool operator == (uint16_t val) const {
+    return value_ == val;
+  }
+  bool operator <= (const Seq16& seq) {
+    return *this == seq || *this < seq;
+  }
+  bool operator < (const Seq16& seq) {
+    if (value_ < seq.value_ && (seq.value_ - value_ < SEQ16_SPACE)) return true; +    if (value_ > seq.value_ && (value_ - seq.value_ > SEQ16_SPACE)) return true;
+    return false;
+  }
+  bool operator > (const Seq16& seq) {
+    if (value_ < seq.value_ && (seq.value_ - value_ > SEQ16_SPACE)) return true; +    if (value_ > seq.value_ && (value_ - seq.value_ < SEQ16_SPACE)) return true;
+    return false;
+  }
+  bool operator >= (const Seq16& seq) {
+    return *this == seq || *this > seq;
+  }
+};
[Vu] Can make this class generic by using class template?
template <typename T>
class Seq {
public:
    explicit Seq(T value) {
        value_ = (value % t_max_);
    }

    T v() { return value_; }
    bool operator <= (const Seq<T>& seq) {}

private:
    T value_;
    constexpr static const T seq_max_ = std::numeric_limits<T>::max();
    constexpr static const T seq_space_ = t_max_/2;
};

using Seq16 = Seq<uint16_t>;
using Seq32 = Seq<uint32_t>;

+
  class MessageQueue {
   public:
    void Queue(DataMessage* msg);
    DataMessage* Find(uint32_t mseq, uint16_t mfrag);
-  uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
+  uint64_t Erase(Seq16 fseq_from, Seq16 fseq_to);
    uint64_t Size() const { return queue_.size(); }
    void Clear();
    DataMessage* FirstUnsent();
-  void MarkUnsentFrom(uint16_t fseq);
+  void MarkUnsentFrom(Seq16 fseq);
   private:
    std::deque<DataMessage*> queue_;
  };
@@ -66,7 +129,7 @@ class TipcPortId {
    ~TipcPortId();
    static uint64_t GetUniqueId(struct tipc_portid id);
    int GetSock() const { return bsrsock_; }
-  uint16_t GetCurrentSeq() { return sndwnd_.send_; }
+  uint16_t GetCurrentSeq() { return (uint16_t)sndwnd_.send_.v(); }
    bool ReceiveCapable(uint16_t sending_len);
    void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
    void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
@@ -92,16 +155,16 @@ class TipcPortId {
      struct sndwnd {
      // sender sequence window
-    uint16_t acked_{0};  // last sequence has been acked by receiver
-    uint16_t send_{1};   // next sequence to be sent
+    Seq16 acked_{0};  // last sequence has been acked by receiver
+    Seq16 send_{1};   // next sequence to be sent
      uint64_t nacked_space_{0};  // total bytes are sent but not acked
    };
    struct sndwnd sndwnd_;
      struct rcvwnd {
      // receiver sequence window
-    uint16_t acked_{0};  // last sequence has been acked to sender
-    uint16_t rcv_{0};    // last sequence has been received
+    Seq16 acked_{0};  // last sequence has been acked to sender
+    Seq16 rcv_{0};    // last sequence has been received
      uint64_t nacked_space_{0};  // total bytes has not been acked
    };
    struct rcvwnd rcvwnd_;




_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to