Re: [devel] [PATCH 8/9] mds: Apply serial number arithmetic for sequence counter [#1960]
Hi Minh, I have a minor comment below. Regards, Vu On 8/14/19 1:38 PM, Minh Chau wrote: This patch applies the serial number arithmetic for the flow control sequence number, referenced to RFC1982. This is only temporary patch, a proper one could be made in /base with template for others type, e.g uint32. Then mds reuses it from /base. --- src/mds/mds_tipc_fctrl_portid.cc | 53 +-- src/mds/mds_tipc_fctrl_portid.h | 77 2 files changed, 97 insertions(+), 33 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index e762290..365d72f 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) { return nullptr; } -uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { +uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) { uint64_t msg_len = 0; for (auto it = queue_.begin(); it != queue_.end();) { DataMessage *m = *it; -if (fseq_from <= m->header_.fseq_ && -m->header_.fseq_ <= fseq_to) { +if (fseq_from <= Seq16(m->header_.fseq_) && +Seq16(m->header_.fseq_) <= fseq_to) { msg_len += m->header_.msg_len_; it = queue_.erase(it); delete m; @@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() { return nullptr; } -void MessageQueue::MarkUnsentFrom(uint16_t fseq) { +void MessageQueue::MarkUnsentFrom(Seq16 fseq) { for (auto it = queue_.begin(); it != queue_.end(); ++it) { DataMessage *m = *it; -if (m->header_.fseq_ >= fseq) m->is_sent_ = false; +if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false; } } @@ -140,7 +140,7 @@ void TipcPortId::FlushData() { "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } } while (msg != nullptr); } @@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, -sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); +sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { ++sndwnd_.send_; m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " @@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, -sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); +sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } return rc; } @@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, txprob_cnt_, (uint8_t)state_); } // update receiver sequence window - if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { + if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvData[mseq:%u, mfrag:%u, fseq:%u], " "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); ++rcvwnd_.rcv_; if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) { @@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, // It is not used for now, so ignore it. // check for transmission error -if (rcvwnd_.rcv_ + 1 < fseq) { +if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) { if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { // peer does not realize that this portid reset m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Warning[portid reset]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); rcvwnd_.rcv_ = fseq; } else { @@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Error[msg loss]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); } }
[devel] [PATCH 8/9] mds: Apply serial number arithmetic for sequence counter [#1960]
This patch applies the serial number arithmetic for the flow control sequence number, referenced to RFC1982. This is only temporary patch, a proper one could be made in /base with template for others type, e.g uint32. Then mds reuses it from /base. --- src/mds/mds_tipc_fctrl_portid.cc | 53 +-- src/mds/mds_tipc_fctrl_portid.h | 77 2 files changed, 97 insertions(+), 33 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index e762290..365d72f 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) { return nullptr; } -uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { +uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) { uint64_t msg_len = 0; for (auto it = queue_.begin(); it != queue_.end();) { DataMessage *m = *it; -if (fseq_from <= m->header_.fseq_ && -m->header_.fseq_ <= fseq_to) { +if (fseq_from <= Seq16(m->header_.fseq_) && +Seq16(m->header_.fseq_) <= fseq_to) { msg_len += m->header_.msg_len_; it = queue_.erase(it); delete m; @@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() { return nullptr; } -void MessageQueue::MarkUnsentFrom(uint16_t fseq) { +void MessageQueue::MarkUnsentFrom(Seq16 fseq) { for (auto it = queue_.begin(); it != queue_.end(); ++it) { DataMessage *m = *it; -if (m->header_.fseq_ >= fseq) m->is_sent_ = false; +if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false; } } @@ -140,7 +140,7 @@ void TipcPortId::FlushData() { "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } } while (msg != nullptr); } @@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, -sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); +sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { ++sndwnd_.send_; m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " @@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, -sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); +sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } return rc; } @@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, txprob_cnt_, (uint8_t)state_); } // update receiver sequence window - if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { + if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvData[mseq:%u, mfrag:%u, fseq:%u], " "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); ++rcvwnd_.rcv_; if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) { @@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, // It is not used for now, so ignore it. // check for transmission error -if (rcvwnd_.rcv_ + 1 < fseq) { +if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) { if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { // peer does not realize that this portid reset m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Warning[portid reset]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); rcvwnd_.rcv_ = fseq; } else { @@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Error[msg loss]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); } } -if (fseq <= rcvwnd_.acked_) { +if (Seq16(fseq) <= rcvwnd_.acked_) { rc = NCSCC_RC_FAILURE; // unexpected retransmission
[devel] [PATCH 8/9] mds: Apply serial number arithmetic for sequence counter [#1960]
This patch applies the serial number arithmetic for the flow control sequence number, referenced to RFC1982. This is only temporary patch, a proper one could be made in /base with template for others type, e.g uint32. Then mds reuses it from /base. --- src/mds/mds_tipc_fctrl_portid.cc | 53 +-- src/mds/mds_tipc_fctrl_portid.h | 77 2 files changed, 97 insertions(+), 33 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index e762290..365d72f 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) { return nullptr; } -uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { +uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) { uint64_t msg_len = 0; for (auto it = queue_.begin(); it != queue_.end();) { DataMessage *m = *it; -if (fseq_from <= m->header_.fseq_ && -m->header_.fseq_ <= fseq_to) { +if (fseq_from <= Seq16(m->header_.fseq_) && +Seq16(m->header_.fseq_) <= fseq_to) { msg_len += m->header_.msg_len_; it = queue_.erase(it); delete m; @@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() { return nullptr; } -void MessageQueue::MarkUnsentFrom(uint16_t fseq) { +void MessageQueue::MarkUnsentFrom(Seq16 fseq) { for (auto it = queue_.begin(); it != queue_.end(); ++it) { DataMessage *m = *it; -if (m->header_.fseq_ >= fseq) m->is_sent_ = false; +if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false; } } @@ -140,7 +140,7 @@ void TipcPortId::FlushData() { "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } } while (msg != nullptr); } @@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, -sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); +sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { ++sndwnd_.send_; m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " @@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, -sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); +sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } return rc; } @@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, txprob_cnt_, (uint8_t)state_); } // update receiver sequence window - if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { + if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvData[mseq:%u, mfrag:%u, fseq:%u], " "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); ++rcvwnd_.rcv_; if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) { @@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, // It is not used for now, so ignore it. // check for transmission error -if (rcvwnd_.rcv_ + 1 < fseq) { +if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) { if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { // peer does not realize that this portid reset m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Warning[portid reset]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); rcvwnd_.rcv_ = fseq; } else { @@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, "Error[msg loss]", id_.node, id_.ref, mseq, mfrag, fseq, -rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); +rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_); } } -if (fseq <= rcvwnd_.acked_) { +if (Seq16(fseq) <= rcvwnd_.acked_) { rc = NCSCC_RC_FAILURE; // unexpected retransmission