This patch applies the serial number arithmetic for the flow control
sequence number, referenced to RFC1982.
This is only temporary patch, a proper one could be made in /base
with template for others type, e.g uint32. Then mds reuses it from
/base.
---
src/mds/mds_tipc_fctrl_portid.cc | 53 +++++++++++++--------------
src/mds/mds_tipc_fctrl_portid.h | 77
++++++++++++++++++++++++++++++++++++----
2 files changed, 97 insertions(+), 33 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_portid.cc
b/src/mds/mds_tipc_fctrl_portid.cc
index e762290..365d72f 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq,
uint16_t mfrag) {
return nullptr;
}
-uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
uint64_t msg_len = 0;
for (auto it = queue_.begin(); it != queue_.end();) {
DataMessage *m = *it;
- if (fseq_from <= m->header_.fseq_ &&
- m->header_.fseq_ <= fseq_to) {
+ if (fseq_from <= Seq16(m->header_.fseq_) &&
+ Seq16(m->header_.fseq_) <= fseq_to) {
msg_len += m->header_.msg_len_;
it = queue_.erase(it);
delete m;
@@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() {
return nullptr;
}
-void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
for (auto it = queue_.begin(); it != queue_.end(); ++it) {
DataMessage *m = *it;
- if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+ if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
}
}
@@ -140,7 +140,7 @@ void TipcPortId::FlushData() {
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ sndwnd_.acked_.v(), sndwnd_.send_.v(),
sndwnd_.nacked_space_);
}
} while (msg != nullptr);
}
@@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data,
uint16_t length,
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
msg->header_.mseq_, msg->header_.mfrag_,
msg->header_.fseq_, length,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
++sndwnd_.send_;
m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
@@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data,
uint16_t length,
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
msg->header_.mseq_, msg->header_.mfrag_,
msg->header_.fseq_, length,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
}
return rc;
}
@@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
txprob_cnt_, (uint8_t)state_);
}
// update receiver sequence window
- if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+ if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) ==
Seq16(fseq)) {
m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvData[mseq:%u, mfrag:%u, fseq:%u], "
"rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
mseq, mfrag, fseq,
- rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+ rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
++rcvwnd_.rcv_;
if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
@@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
// It is not used for now, so ignore it.
// check for transmission error
- if (rcvwnd_.rcv_ + 1 < fseq) {
+ if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) {
if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
// peer does not realize that this portid reset
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
"Warning[portid reset]",
id_.node, id_.ref,
mseq, mfrag, fseq,
- rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+ rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(),
rcvwnd_.nacked_space_);
rcvwnd_.rcv_ = fseq;
} else {
@@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
"Error[msg loss]",
id_.node, id_.ref,
mseq, mfrag, fseq,
- rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+ rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(),
rcvwnd_.nacked_space_);
}
}
- if (fseq <= rcvwnd_.acked_) {
+ if (Seq16(fseq) <= rcvwnd_.acked_) {
rc = NCSCC_RC_FAILURE;
// unexpected retransmission
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -312,7 +312,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
"Error[unexpected retransmission]",
id_.node, id_.ref,
mseq, mfrag, fseq,
- rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+ rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
}
}
return rc;
@@ -338,14 +338,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
txprob_cnt_, (uint8_t)state_);
}
// update sender sequence window
- if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
+ if (sndwnd_.acked_ < Seq16(fseq) && Seq16(fseq) < sndwnd_.send_) {
m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvChkAck[fseq:%u, chunk:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
"queue[size:%" PRIu64 "]",
id_.node, id_.ref,
fseq, chksize,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
sndqueue_.Size());
// fast forward the sndwnd_.acked_ sequence to fseq
@@ -353,7 +353,8 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
// remove a number @chksize messages out of sndqueue_ and
decrease
// the nacked_space_ of sender
- uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq);
+ uint64_t acked_bytes = sndqueue_.Erase(Seq16(fseq) - (chksize-1),
+ Seq16(fseq));
sndwnd_.nacked_space_ -= acked_bytes;
// try to send a few pending msg
@@ -375,7 +376,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
msg->header_.fseq_, msg->header_.msg_len_,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ sndwnd_.acked_.v(), sndwnd_.send_.v(),
sndwnd_.nacked_space_);
}
} else {
break;
@@ -396,7 +397,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
"Error[msg disordered]",
id_.node, id_.ref,
fseq, chksize,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_,
sndqueue_.Size());
}
}
@@ -417,14 +418,14 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
"RcvNack, ignore[fseq:%u, state:%u]",
id_.node, id_.ref,
fseq, (uint8_t)state_);
- sndqueue_.MarkUnsentFrom(fseq);
+ sndqueue_.MarkUnsentFrom(Seq16(fseq));
return;
}
if (state_ != State::kRcvBuffOverflow) {
state_ = State::kRcvBuffOverflow;
m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
id_.node, id_.ref);
- sndqueue_.MarkUnsentFrom(fseq);
+ sndqueue_.MarkUnsentFrom(Seq16(fseq));
}
DataMessage* msg = sndqueue_.Find(mseq, mfrag);
if (msg != nullptr) {
@@ -437,7 +438,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
mseq, mfrag, fseq,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
"RsndData[mseq:%u, mfrag:%u, fseq:%u], "
@@ -450,8 +451,8 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
bool restart_txprob = false;
if (state_ == State::kDisabled ||
- sndwnd_.acked_ > 1 ||
- rcvwnd_.rcv_ > 1) return restart_txprob;
+ sndwnd_.acked_ > Seq16(1) ||
+ rcvwnd_.rcv_ > Seq16(1)) return restart_txprob;
if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) {
txprob_cnt_++;
if (txprob_cnt_ >= max_txprob) {
@@ -484,7 +485,7 @@ void TipcPortId::ReceiveTmrChunkAck() {
"ChkAckExp",
id_.node, id_.ref);
// send ack for @chksize msgs starting from rcvwnd_.rcv_
- SendChunkAck(rcvwnd_.rcv_, 0, chksize);
+ SendChunkAck(rcvwnd_.rcv_.v(), 0, chksize);
rcvwnd_.acked_ = rcvwnd_.rcv_;
}
}
diff --git a/src/mds/mds_tipc_fctrl_portid.h
b/src/mds/mds_tipc_fctrl_portid.h
index 119c012..cf2daaa 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -28,15 +28,78 @@
namespace mds {
+class Seq16 {
+ public:
+#define SEQ16_MAX 65536
+#define SEQ16_SPACE 32768
+ uint16_t value_;
+ explicit Seq16(uint16_t v) {
+ value_ = uint16_t((uint32_t)v % SEQ16_MAX);
+ }
+ uint16_t v() {
+ return value_;
+ }
+ Seq16 operator + (const Seq16 add) const {
+ return Seq16(((uint32_t)value_ + (uint32_t)add.value_) %
SEQ16_MAX);
+ }
+
+ int16_t operator - (const Seq16 sub) const {
+ if (value_ < sub.value_ && (sub.value_ - value_ < SEQ16_SPACE)) {
+ return value_ - sub.value_;
+ }
+ if (value_ > sub.value_ && (value_ - sub.value_ > SEQ16_SPACE)) {
+ return (int32_t)value_ + SEQ16_MAX - (int32_t)sub.value_;
+ }
+ if (value_ < sub.value_ && (sub.value_ - value_ > SEQ16_SPACE)) {
+ return (int32_t)value_ + SEQ16_MAX - (int32_t)sub.value_;
+ }
+ if (value_ > sub.value_ && (value_ - sub.value_ < SEQ16_SPACE)) {
+ return value_ - sub.value_;
+ }
+ return 0;
+ }
+ Seq16 operator - (const uint16_t sub) const {
+ return Seq16(((uint32_t)value_ + 65536 - sub) % SEQ16_MAX);
+ }
+ void operator ++() {
+ value_ = (value_ + 1) % SEQ16_MAX;
+ }
+ void operator = (const uint16_t v) {
+ value_ = v % SEQ16_MAX;
+ }
+ bool operator == (const Seq16& seq) const {
+ return value_ == seq.value_;
+ }
+ bool operator == (uint16_t val) const {
+ return value_ == val;
+ }
+ bool operator <= (const Seq16& seq) {
+ return *this == seq || *this < seq;
+ }
+ bool operator < (const Seq16& seq) {
+ if (value_ < seq.value_ && (seq.value_ - value_ < SEQ16_SPACE))
return true;
+ if (value_ > seq.value_ && (value_ - seq.value_ > SEQ16_SPACE))
return true;
+ return false;
+ }
+ bool operator > (const Seq16& seq) {
+ if (value_ < seq.value_ && (seq.value_ - value_ > SEQ16_SPACE))
return true;
+ if (value_ > seq.value_ && (value_ - seq.value_ < SEQ16_SPACE))
return true;
+ return false;
+ }
+ bool operator >= (const Seq16& seq) {
+ return *this == seq || *this > seq;
+ }
+};