Re: [devel] [PATCH 8/9] mds: Apply serial number arithmetic for sequence counter [#1960]

2019-09-16 Thread Nguyen Minh Vu

Hi Minh,

I have a minor comment below.

Regards, Vu

On 8/14/19 1:38 PM, Minh Chau wrote:

This patch applies the serial number arithmetic for the flow control
sequence number, referenced to RFC1982.

This is only temporary patch, a proper one could be made in /base
with template for others type, e.g uint32. Then mds reuses it from
/base.
---
  src/mds/mds_tipc_fctrl_portid.cc | 53 +--
  src/mds/mds_tipc_fctrl_portid.h  | 77 
  2 files changed, 97 insertions(+), 33 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index e762290..365d72f 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t 
mfrag) {
return nullptr;
  }
  
-uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {

+uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
uint64_t msg_len = 0;
for (auto it = queue_.begin(); it != queue_.end();) {
  DataMessage *m = *it;
-if (fseq_from <= m->header_.fseq_ &&
-m->header_.fseq_ <= fseq_to) {
+if (fseq_from <= Seq16(m->header_.fseq_) &&
+Seq16(m->header_.fseq_) <= fseq_to) {
msg_len += m->header_.msg_len_;
it = queue_.erase(it);
delete m;
@@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() {
return nullptr;
  }
  
-void MessageQueue::MarkUnsentFrom(uint16_t fseq) {

+void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
for (auto it = queue_.begin(); it != queue_.end(); ++it) {
  DataMessage *m = *it;
-if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
}
  }
  
@@ -140,7 +140,7 @@ void TipcPortId::FlushData() {

"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
-  sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
  }
} while (msg != nullptr);
  }
@@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
  "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
  id_.node, id_.ref,
  msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
  ++sndwnd_.send_;
  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
@@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
  "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
  id_.node, id_.ref,
  msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
}
return rc;
  }
@@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
  txprob_cnt_, (uint8_t)state_);
}
// update receiver sequence window
-  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+  if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) {
  m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
  "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
  "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
  id_.node, id_.ref,
  mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
  
  ++rcvwnd_.rcv_;

  if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
@@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
  // It is not used for now, so ignore it.
  
  // check for transmission error

-if (rcvwnd_.rcv_ + 1 < fseq) {
+if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) {
if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
  // peer does not realize that this portid reset
  m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
  "Warning[portid reset]",
  id_.node, id_.ref,
  mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
  
  rcvwnd_.rcv_ = fseq;

} else {
@@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
  "Error[msg loss]",
  id_.node, id_.ref,
  mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
}
  }

[devel] [PATCH 8/9] mds: Apply serial number arithmetic for sequence counter [#1960]

2019-08-14 Thread Minh Chau
This patch applies the serial number arithmetic for the flow control
sequence number, referenced to RFC1982.

This is only temporary patch, a proper one could be made in /base
with template for others type, e.g uint32. Then mds reuses it from
/base.
---
 src/mds/mds_tipc_fctrl_portid.cc | 53 +--
 src/mds/mds_tipc_fctrl_portid.h  | 77 
 2 files changed, 97 insertions(+), 33 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index e762290..365d72f 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t 
mfrag) {
   return nullptr;
 }
 
-uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
   uint64_t msg_len = 0;
   for (auto it = queue_.begin(); it != queue_.end();) {
 DataMessage *m = *it;
-if (fseq_from <= m->header_.fseq_ &&
-m->header_.fseq_ <= fseq_to) {
+if (fseq_from <= Seq16(m->header_.fseq_) &&
+Seq16(m->header_.fseq_) <= fseq_to) {
   msg_len += m->header_.msg_len_;
   it = queue_.erase(it);
   delete m;
@@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() {
   return nullptr;
 }
 
-void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
   for (auto it = queue_.begin(); it != queue_.end(); ++it) {
 DataMessage *m = *it;
-if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
   }
 }
 
@@ -140,7 +140,7 @@ void TipcPortId::FlushData() {
   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
   id_.node, id_.ref,
   msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
-  sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
 }
   } while (msg != nullptr);
 }
@@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
 id_.node, id_.ref,
 msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   } else {
 ++sndwnd_.send_;
 m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
@@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
 id_.node, id_.ref,
 msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   }
   return rc;
 }
@@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 txprob_cnt_, (uint8_t)state_);
   }
   // update receiver sequence window
-  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+  if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) {
 m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
 "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
 "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
 id_.node, id_.ref,
 mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
 
 ++rcvwnd_.rcv_;
 if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
@@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 // It is not used for now, so ignore it.
 
 // check for transmission error
-if (rcvwnd_.rcv_ + 1 < fseq) {
+if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) {
   if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
 // peer does not realize that this portid reset
 m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 "Warning[portid reset]",
 id_.node, id_.ref,
 mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
 
 rcvwnd_.rcv_ = fseq;
   } else {
@@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 "Error[msg loss]",
 id_.node, id_.ref,
 mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
   }
 }
-if (fseq <= rcvwnd_.acked_) {
+if (Seq16(fseq) <= rcvwnd_.acked_) {
   rc = NCSCC_RC_FAILURE;
   // unexpected retransmission
   

[devel] [PATCH 8/9] mds: Apply serial number arithmetic for sequence counter [#1960]

2019-08-14 Thread Minh Chau
This patch applies the serial number arithmetic for the flow control
sequence number, referenced to RFC1982.

This is only temporary patch, a proper one could be made in /base
with template for others type, e.g uint32. Then mds reuses it from
/base.
---
 src/mds/mds_tipc_fctrl_portid.cc | 53 +--
 src/mds/mds_tipc_fctrl_portid.h  | 77 
 2 files changed, 97 insertions(+), 33 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index e762290..365d72f 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -66,12 +66,12 @@ DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t 
mfrag) {
   return nullptr;
 }
 
-uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+uint64_t MessageQueue::Erase(Seq16 fseq_from, Seq16 fseq_to) {
   uint64_t msg_len = 0;
   for (auto it = queue_.begin(); it != queue_.end();) {
 DataMessage *m = *it;
-if (fseq_from <= m->header_.fseq_ &&
-m->header_.fseq_ <= fseq_to) {
+if (fseq_from <= Seq16(m->header_.fseq_) &&
+Seq16(m->header_.fseq_) <= fseq_to) {
   msg_len += m->header_.msg_len_;
   it = queue_.erase(it);
   delete m;
@@ -92,10 +92,10 @@ DataMessage* MessageQueue::FirstUnsent() {
   return nullptr;
 }
 
-void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+void MessageQueue::MarkUnsentFrom(Seq16 fseq) {
   for (auto it = queue_.begin(); it != queue_.end(); ++it) {
 DataMessage *m = *it;
-if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+if (Seq16(m->header_.fseq_) >= fseq) m->is_sent_ = false;
   }
 }
 
@@ -140,7 +140,7 @@ void TipcPortId::FlushData() {
   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
   id_.node, id_.ref,
   msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
-  sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
 }
   } while (msg != nullptr);
 }
@@ -185,7 +185,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
 id_.node, id_.ref,
 msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   } else {
 ++sndwnd_.send_;
 m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
@@ -193,7 +193,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t 
length,
 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
 id_.node, id_.ref,
 msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
-sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   }
   return rc;
 }
@@ -248,13 +248,13 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 txprob_cnt_, (uint8_t)state_);
   }
   // update receiver sequence window
-  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+  if (rcvwnd_.acked_ < Seq16(fseq) && rcvwnd_.rcv_ + Seq16(1) == Seq16(fseq)) {
 m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
 "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
 "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
 id_.node, id_.ref,
 mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
 
 ++rcvwnd_.rcv_;
 if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
@@ -279,7 +279,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 // It is not used for now, so ignore it.
 
 // check for transmission error
-if (rcvwnd_.rcv_ + 1 < fseq) {
+if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) {
   if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
 // peer does not realize that this portid reset
 m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -288,7 +288,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 "Warning[portid reset]",
 id_.node, id_.ref,
 mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
 
 rcvwnd_.rcv_ = fseq;
   } else {
@@ -300,10 +300,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
 "Error[msg loss]",
 id_.node, id_.ref,
 mseq, mfrag, fseq,
-rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
   }
 }
-if (fseq <= rcvwnd_.acked_) {
+if (Seq16(fseq) <= rcvwnd_.acked_) {
   rc = NCSCC_RC_FAILURE;
   // unexpected retransmission