This patch applies the serial number arithmetic for the flow control
sequence number, referenced to RFC1982.

This is only temporary patch, a proper one could be made in /base
with template for others type, e.g uint32. Then mds reuses it from
/base.
---
 src/mds/mds_tipc_fctrl_portid.cc | 53 +++++++++++++--------------
 src/mds/mds_tipc_fctrl_portid.h  | 77 ++++++++++++++++++++++++++++++++++++----
 2 files changed, 97 insertions(+), 33 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index e762290..365d72f 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t 
mfrag) {
   return nullptr;
 }
 
-uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
   uint64_t msg_len = 0;
   for (auto it = queue_.begin(); it != queue_.end();) {
     DataMessage *m = *it;
-    if (fseq_from <= m->header_.fseq_ &&
-        m->header_.fseq_ <= fseq_to) {
+    if (fseq_from <= Seq16(m->header_.fseq_) &&
+        Seq16(m->header_.fseq_) <= fseq_to) {
       msg_len += m->header_.msg_len_;
       it = queue_.erase(it);
       delete m;
@@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() {
   return nullptr;
 }
 
-void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
   for (auto it = queue_.begin(); it != queue_.end(); ++it) {
     DataMessage *m = *it;
-    if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+    if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
   }
 }
 
@@ -140,7 +140,7 @@ void TipcPortId::FlushData() {
           "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
           id_.node, id_.ref,
           msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
-          sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+          sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
     }
   } while (msg != nullptr);
 }
@@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
         "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
         id_.node, id_.ref,
         msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   } else {
     ++sndwnd_.send_;
     m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
@@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
         "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
         id_.node, id_.ref,
         msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   }
   return rc;
 }
@@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
         txprob_cnt_, (uint8_t)state_);
   }
   // update receiver sequence window
-  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+  if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) {
     m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
         "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
         "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
         id_.node, id_.ref,
         mseq, mfrag, fseq,
-        rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+        rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
 
     ++rcvwnd_.rcv_;
     if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
@@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
     // It is not used for now, so ignore it.
 
     // check for transmission error
-    if (rcvwnd_.rcv_ + 1 < fseq) {
+    if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) {
       if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
         // peer does not realize that this portid reset
         m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
             "Warning[portid reset]",
             id_.node, id_.ref,
             mseq, mfrag, fseq,
-            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+            rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
 
         rcvwnd_.rcv_ = fseq;
       } else {
@@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
             "Error[msg loss]",
             id_.node, id_.ref,
             mseq, mfrag, fseq,
-            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+            rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
       }
     }
-    if (fseq <= rcvwnd_.acked_) {
+    if (Seq16(fseq) <= rcvwnd_.acked_) {
       rc = NCSCC_RC_FAILURE;
       // unexpected retransmission
       m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -312,7 +312,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
           "Error[unexpected retransmission]",
           id_.node, id_.ref,
           mseq, mfrag, fseq,
-          rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+          rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
     }
   }
   return rc;
@@ -338,14 +338,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
         txprob_cnt_, (uint8_t)state_);
   }
   // update sender sequence window
-  if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
+  if (sndwnd_.acked_ < Seq16(fseq) && Seq16(fseq) < sndwnd_.send_) {
     m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
         "RcvChkAck[fseq:%u, chunk:%u], "
         "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
         "queue[size:%" PRIu64 "]",
         id_.node, id_.ref,
         fseq, chksize,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
         sndqueue_.Size());
 
     // fast forward the sndwnd_.acked_ sequence to fseq
@@ -353,7 +353,8 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 
     // remove a number @chksize messages out of sndqueue_ and decrease
     // the nacked_space_ of sender
-    uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq);
+    uint64_t acked_bytes = sndqueue_.Erase(Seq16(fseq) - (chksize-1),
+        Seq16(fseq));
     sndwnd_.nacked_space_ -= acked_bytes;
 
     // try to send a few pending msg
@@ -375,7 +376,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
                 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
                 id_.node, id_.ref,
                 msg->header_.fseq_, msg->header_.msg_len_,
-                sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+                sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
           }
         } else {
           break;
@@ -396,7 +397,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
         "Error[msg disordered]",
         id_.node, id_.ref,
         fseq, chksize,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
         sndqueue_.Size());
   }
 }
@@ -417,14 +418,14 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t 
mfrag,
         "RcvNack,  ignore[fseq:%u, state:%u]",
         id_.node, id_.ref,
         fseq, (uint8_t)state_);
-    sndqueue_.MarkUnsentFrom(fseq);
+    sndqueue_.MarkUnsentFrom(Seq16(fseq));
     return;
   }
   if (state_ != State::kRcvBuffOverflow) {
     state_ = State::kRcvBuffOverflow;
     m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
         id_.node, id_.ref);
-    sndqueue_.MarkUnsentFrom(fseq);
+    sndqueue_.MarkUnsentFrom(Seq16(fseq));
   }
   DataMessage* msg = sndqueue_.Find(mseq, mfrag);
   if (msg != nullptr) {
@@ -437,7 +438,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
         "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
         id_.node, id_.ref,
         mseq, mfrag, fseq,
-        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+        sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   } else {
     m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
         "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
@@ -450,8 +451,8 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
 bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
   bool restart_txprob = false;
   if (state_ == State::kDisabled ||
-      sndwnd_.acked_ > 1 ||
-      rcvwnd_.rcv_ > 1) return restart_txprob;
+      sndwnd_.acked_ > Seq16(1) ||
+      rcvwnd_.rcv_ > Seq16(1)) return restart_txprob;
   if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) {
     txprob_cnt_++;
     if (txprob_cnt_ >= max_txprob) {
@@ -484,7 +485,7 @@ void TipcPortId::ReceiveTmrChunkAck() {
         "ChkAckExp",
         id_.node, id_.ref);
     // send ack for @chksize msgs starting from rcvwnd_.rcv_
-    SendChunkAck(rcvwnd_.rcv_, 0, chksize);
+    SendChunkAck(rcvwnd_.rcv_.v(), 0, chksize);
     rcvwnd_.acked_ = rcvwnd_.rcv_;
   }
 }
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index 119c012..cf2daaa 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -28,15 +28,78 @@
 
 namespace mds {
 
+class Seq16 {
+ public:
+#define SEQ16_MAX 65536
+#define SEQ16_SPACE 32768
+  uint16_t value_;
+  explicit Seq16(uint16_t v) {
+    value_ = uint16_t((uint32_t)v % SEQ16_MAX);
+  }
+  uint16_t v() {
+    return value_;
+  }
+  Seq16 operator + (const Seq16 add) const {
+    return Seq16(((uint32_t)value_ + (uint32_t)add.value_) % SEQ16_MAX);
+  }
+
+  int16_t operator - (const Seq16 sub) const {
+    if (value_ < sub.value_ && (sub.value_ - value_ < SEQ16_SPACE)) {
+      return value_ - sub.value_;
+    }
+    if (value_ > sub.value_ && (value_ - sub.value_ > SEQ16_SPACE)) {
+      return (int32_t)value_ + SEQ16_MAX -  (int32_t)sub.value_;
+    }
+    if (value_ < sub.value_ && (sub.value_ - value_ > SEQ16_SPACE)) {
+      return (int32_t)value_ + SEQ16_MAX -  (int32_t)sub.value_;
+    }
+    if (value_ > sub.value_ && (value_ - sub.value_ < SEQ16_SPACE)) {
+      return value_ - sub.value_;
+    }
+    return 0;
+  }
+  Seq16 operator - (const uint16_t sub) const {
+    return Seq16(((uint32_t)value_ + 65536 - sub) % SEQ16_MAX);
+  }
+  void operator ++() {
+    value_ = (value_ + 1) % SEQ16_MAX;
+  }
+  void operator = (const uint16_t v) {
+    value_ = v % SEQ16_MAX;
+  }
+  bool operator == (const Seq16& seq) const {
+    return value_ == seq.value_;
+  }
+  bool operator == (uint16_t val) const {
+    return value_ == val;
+  }
+  bool operator <= (const Seq16& seq) {
+    return *this == seq || *this < seq;
+  }
+  bool operator < (const Seq16& seq) {
+    if (value_ < seq.value_ && (seq.value_ - value_ < SEQ16_SPACE)) return 
true;
+    if (value_ > seq.value_ && (value_ - seq.value_ > SEQ16_SPACE)) return 
true;
+    return false;
+  }
+  bool operator > (const Seq16& seq) {
+    if (value_ < seq.value_ && (seq.value_ - value_ > SEQ16_SPACE)) return 
true;
+    if (value_ > seq.value_ && (value_ - seq.value_ < SEQ16_SPACE)) return 
true;
+    return false;
+  }
+  bool operator >= (const Seq16& seq) {
+    return *this == seq || *this > seq;
+  }
+};
+
 class MessageQueue {
  public:
   void Queue(DataMessage* msg);
   DataMessage* Find(uint32_t mseq, uint16_t mfrag);
-  uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
+  uint64_t Erase(Seq16 fseq_from, Seq16 fseq_to);
   uint64_t Size() const { return queue_.size(); }
   void Clear();
   DataMessage* FirstUnsent();
-  void MarkUnsentFrom(uint16_t fseq);
+  void MarkUnsentFrom(Seq16 fseq);
  private:
   std::deque<DataMessage*> queue_;
 };
@@ -66,7 +129,7 @@ class TipcPortId {
   ~TipcPortId();
   static uint64_t GetUniqueId(struct tipc_portid id);
   int GetSock() const { return bsrsock_; }
-  uint16_t GetCurrentSeq() { return sndwnd_.send_; }
+  uint16_t GetCurrentSeq() { return (uint16_t)sndwnd_.send_.v(); }
   bool ReceiveCapable(uint16_t sending_len);
   void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
   void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
@@ -92,16 +155,16 @@ class TipcPortId {
 
   struct sndwnd {
     // sender sequence window
-    uint16_t acked_{0};  // last sequence has been acked by receiver
-    uint16_t send_{1};   // next sequence to be sent
+    Seq16 acked_{0};  // last sequence has been acked by receiver
+    Seq16 send_{1};   // next sequence to be sent
     uint64_t nacked_space_{0};  // total bytes are sent but not acked
   };
   struct sndwnd sndwnd_;
 
   struct rcvwnd {
     // receiver sequence window
-    uint16_t acked_{0};  // last sequence has been acked to sender
-    uint16_t rcv_{0};    // last sequence has been received
+    Seq16 acked_{0};  // last sequence has been acked to sender
+    Seq16 rcv_{0};    // last sequence has been received
     uint64_t nacked_space_{0};  // total bytes has not been acked
   };
   struct rcvwnd rcvwnd_;
-- 
2.7.4



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to