This patch applies the serial number arithmetic for the flow control sequence number, referenced to RFC1982.
This is only temporary patch, a proper one could be made in /base with template for others type, e.g uint32. Then mds reuses it from /base. --- src/mds/mds_tipc_fctrl_portid.cc | 53 +++++++++++++-------------- src/mds/mds_tipc_fctrl_portid.h | 77 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 97 insertions(+), 33 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index e762290..365d72f 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) { return nullptr; } -uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { +uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) { uint64_t msg_len = 0; for (auto it = queue_.begin(); it != queue_.end();) { DataMessage *m = *it; - if (fseq_from <= m->header_.fseq_ && - m->header_.fseq_ <= fseq_to) { + if (fseq_from <= Seq16(m->header_.fseq_) && + Seq16(m->header_.fseq_) <= fseq_to) { msg_len += m->header_.msg_len_; it = queue_.erase(it); delete m; @@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() { return nullptr; } -void MessageQueue::MarkUnsentFrom(uint16_t fseq) { +void MessageQueue::MarkUnsentFrom(Seq16 fseq) { for (auto it = queue_.begin(); it != queue_.end(); ++it) { DataMessage *m = *it; - if (m->header_.fseq_ >= fseq) m->is_sent_ = false; + if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false; } } @@ -140,7 +140,7 @@ void TipcPortId::FlushData() { "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } } while (msg != nullptr); } @@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { ++sndwnd_.send_; m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " @@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } return rc; } @@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, txprob_cnt_, (uint8_t)state_); } // update receiver sequence window - if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { + if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvData[mseq:%u, mfrag:%u, fseq:%u], " "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, mseq, mfrag, fseq, - rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); ++rcvwnd_.rcv_; if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) { @@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, // It is not used for now, so ignore it. // check for transmission error - if (rcvwnd_.rcv_ + 1 < fseq) { + if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) { if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { // peer does not realize that this portid reset m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Warning[portid reset]", id_.node, id_.ref, mseq, mfrag, fseq, - rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); rcvwnd_.rcv_ = fseq; } else { @@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Error[msg loss]", id_.node, id_.ref, mseq, mfrag, fseq, - rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); } } - if (fseq <= rcvwnd_.acked_) { + if (Seq16(fseq) <= rcvwnd_.acked_) { rc = NCSCC_RC_FAILURE; // unexpected retransmission m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -312,7 +312,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Error[unexpected retransmission]", id_.node, id_.ref, mseq, mfrag, fseq, - rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); } } return rc; @@ -338,14 +338,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { txprob_cnt_, (uint8_t)state_); } // update sender sequence window - if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) { + if (sndwnd_.acked_ < Seq16(fseq) && Seq16(fseq) < sndwnd_.send_) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvChkAck[fseq:%u, chunk:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " "queue[size:%" PRIu64 "]", id_.node, id_.ref, fseq, chksize, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_, sndqueue_.Size()); // fast forward the sndwnd_.acked_ sequence to fseq @@ -353,7 +353,8 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // remove a number @chksize messages out of sndqueue_ and decrease // the nacked_space_ of sender - uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq); + uint64_t acked_bytes = sndqueue_.Erase(Seq16(fseq) - (chksize-1), + Seq16(fseq)); sndwnd_.nacked_space_ -= acked_bytes; // try to send a few pending msg @@ -375,7 +376,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.fseq_, msg->header_.msg_len_, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } } else { break; @@ -396,7 +397,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { "Error[msg disordered]", id_.node, id_.ref, fseq, chksize, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_, sndqueue_.Size()); } } @@ -417,14 +418,14 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, "RcvNack, ignore[fseq:%u, state:%u]", id_.node, id_.ref, fseq, (uint8_t)state_); - sndqueue_.MarkUnsentFrom(fseq); + sndqueue_.MarkUnsentFrom(Seq16(fseq)); return; } if (state_ != State::kRcvBuffOverflow) { state_ = State::kRcvBuffOverflow; m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ", id_.node, id_.ref); - sndqueue_.MarkUnsentFrom(fseq); + sndqueue_.MarkUnsentFrom(Seq16(fseq)); } DataMessage* msg = sndqueue_.Find(mseq, mfrag); if (msg != nullptr) { @@ -437,7 +438,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, mseq, mfrag, fseq, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " @@ -450,8 +451,8 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) { bool restart_txprob = false; if (state_ == State::kDisabled || - sndwnd_.acked_ > 1 || - rcvwnd_.rcv_ > 1) return restart_txprob; + sndwnd_.acked_ > Seq16(1) || + rcvwnd_.rcv_ > Seq16(1)) return restart_txprob; if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) { txprob_cnt_++; if (txprob_cnt_ >= max_txprob) { @@ -484,7 +485,7 @@ void TipcPortId::ReceiveTmrChunkAck() { "ChkAckExp", id_.node, id_.ref); // send ack for @chksize msgs starting from rcvwnd_.rcv_ - SendChunkAck(rcvwnd_.rcv_, 0, chksize); + SendChunkAck(rcvwnd_.rcv_.v(), 0, chksize); rcvwnd_.acked_ = rcvwnd_.rcv_; } } diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h index 119c012..cf2daaa 100644 --- a/src/mds/mds_tipc_fctrl_portid.h +++ b/src/mds/mds_tipc_fctrl_portid.h @@ -28,15 +28,78 @@ namespace mds { +class Seq16 { + public: +#define SEQ16_MAX 65536 +#define SEQ16_SPACE 32768 + uint16_t value_; + explicit Seq16(uint16_t v) { + value_ = uint16_t((uint32_t)v % SEQ16_MAX); + } + uint16_t v() { + return value_; + } + Seq16 operator + (const Seq16 add) const { + return Seq16(((uint32_t)value_ + (uint32_t)add.value_) % SEQ16_MAX); + } + + int16_t operator - (const Seq16 sub) const { + if (value_ < sub.value_ && (sub.value_ - value_ < SEQ16_SPACE)) { + return value_ - sub.value_; + } + if (value_ > sub.value_ && (value_ - sub.value_ > SEQ16_SPACE)) { + return (int32_t)value_ + SEQ16_MAX - (int32_t)sub.value_; + } + if (value_ < sub.value_ && (sub.value_ - value_ > SEQ16_SPACE)) { + return (int32_t)value_ + SEQ16_MAX - (int32_t)sub.value_; + } + if (value_ > sub.value_ && (value_ - sub.value_ < SEQ16_SPACE)) { + return value_ - sub.value_; + } + return 0; + } + Seq16 operator - (const uint16_t sub) const { + return Seq16(((uint32_t)value_ + 65536 - sub) % SEQ16_MAX); + } + void operator ++() { + value_ = (value_ + 1) % SEQ16_MAX; + } + void operator = (const uint16_t v) { + value_ = v % SEQ16_MAX; + } + bool operator == (const Seq16& seq) const { + return value_ == seq.value_; + } + bool operator == (uint16_t val) const { + return value_ == val; + } + bool operator <= (const Seq16& seq) { + return *this == seq || *this < seq; + } + bool operator < (const Seq16& seq) { + if (value_ < seq.value_ && (seq.value_ - value_ < SEQ16_SPACE)) return true; + if (value_ > seq.value_ && (value_ - seq.value_ > SEQ16_SPACE)) return true; + return false; + } + bool operator > (const Seq16& seq) { + if (value_ < seq.value_ && (seq.value_ - value_ > SEQ16_SPACE)) return true; + if (value_ > seq.value_ && (value_ - seq.value_ < SEQ16_SPACE)) return true; + return false; + } + bool operator >= (const Seq16& seq) { + return *this == seq || *this > seq; + } +}; + class MessageQueue { public: void Queue(DataMessage* msg); DataMessage* Find(uint32_t mseq, uint16_t mfrag); - uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to); + uint64_t Erase(Seq16 fseq_from, Seq16 fseq_to); uint64_t Size() const { return queue_.size(); } void Clear(); DataMessage* FirstUnsent(); - void MarkUnsentFrom(uint16_t fseq); + void MarkUnsentFrom(Seq16 fseq); private: std::deque<DataMessage*> queue_; }; @@ -66,7 +129,7 @@ class TipcPortId { ~TipcPortId(); static uint64_t GetUniqueId(struct tipc_portid id); int GetSock() const { return bsrsock_; } - uint16_t GetCurrentSeq() { return sndwnd_.send_; } + uint16_t GetCurrentSeq() { return (uint16_t)sndwnd_.send_.v(); } bool ReceiveCapable(uint16_t sending_len); void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size); void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size); @@ -92,16 +155,16 @@ class TipcPortId { struct sndwnd { // sender sequence window - uint16_t acked_{0}; // last sequence has been acked by receiver - uint16_t send_{1}; // next sequence to be sent + Seq16 acked_{0}; // last sequence has been acked by receiver + Seq16 send_{1}; // next sequence to be sent uint64_t nacked_space_{0}; // total bytes are sent but not acked }; struct sndwnd sndwnd_; struct rcvwnd { // receiver sequence window - uint16_t acked_{0}; // last sequence has been acked to sender - uint16_t rcv_{0}; // last sequence has been received + Seq16 acked_{0}; // last sequence has been acked to sender + Seq16 rcv_{0}; // last sequence has been received uint64_t nacked_space_{0}; // total bytes has not been acked }; struct rcvwnd rcvwnd_; -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel