Hi Minh,
I has few comments below.
Regards, Vu
On 8/14/19 1:38 PM, Minh Chau wrote:
This patch implements the kRcvBuffOverflow state machine as
described in README file.
---
src/mds/mds_tipc_fctrl_intf.cc | 6 +-
src/mds/mds_tipc_fctrl_msg.h | 1 +
src/mds/mds_tipc_fctrl_portid.cc | 137
++++++++++++++++++++++++++++++++++-----
src/mds/mds_tipc_fctrl_portid.h | 5 +-
4 files changed, 131 insertions(+), 18 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_intf.cc
b/src/mds/mds_tipc_fctrl_intf.cc
index c2d0922..397114e 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -285,14 +285,16 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t
*buffer, uint16_t len,
rc = NCSCC_RC_FAILURE;
} else {
if (portid->state_ != TipcPortId::State::kDisabled) {
- portid->Queue(buffer, len);
+ bool sendable = portid->ReceiveCapable(len);
+ portid->Queue(buffer, len, sendable);
// start txprob timer for the first msg sent out
// do not start for other states
- if (portid->state_ == TipcPortId::State::kStartup) {
+ if (sendable && portid->state_ == TipcPortId::State::kStartup) {
txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk);
m_MDS_LOG_DBG("FCTRL: Start txprob");
portid->state_ = TipcPortId::State::kTxProb;
}
+ if (sendable == false) rc = NCSCC_RC_FAILURE;
}
}
diff --git a/src/mds/mds_tipc_fctrl_msg.h
b/src/mds/mds_tipc_fctrl_msg.h
index 69f8048..e6b9662 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -110,6 +110,7 @@ class DataMessage: public BaseMessage {
uint8_t* msg_data_{nullptr};
uint8_t snd_type_{0};
+ bool is_sent_{true};
DataMessage() {}
virtual ~DataMessage();
void Decode(uint8_t *msg) override;
diff --git a/src/mds/mds_tipc_fctrl_portid.cc
b/src/mds/mds_tipc_fctrl_portid.cc
index 84ecee9..e762290 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -82,6 +82,23 @@ uint64_t MessageQueue::Erase(uint16_t fseq_from,
uint16_t fseq_to) {
return msg_len;
}
+DataMessage* MessageQueue::FirstUnsent() {
+ for (auto it = queue_.begin(); it != queue_.end(); ++it) {
[Vu] Use the shorter version `for (const auto& it : queue_)
+ DataMessage *m = *it;
+ if (m->is_sent_ == false) {
+ return m;
+ }
+ }
+ return nullptr;
+}
+
+void MessageQueue::MarkUnsentFrom(uint16_t fseq) {
+ for (auto it = queue_.begin(); it != queue_.end(); ++it) {
[Vu] as above comment
+ DataMessage *m = *it;
+ if (m->header_.fseq_ >= fseq) m->is_sent_ = false;
+ }
+}
+
void MessageQueue::Clear() {
while (queue_.empty() == false) {
DataMessage* msg = queue_.front();
@@ -99,7 +116,8 @@ TipcPortId::TipcPortId(struct tipc_portid id, int
sock, uint16_t chksize,
TipcPortId::~TipcPortId() {
// Fake a TmrChunkAck event to ack all received messages
ReceiveTmrChunkAck();
- // clear all msg in sndqueue_
+ // flush all unsent msg in sndqueue_
+ FlushData();
sndqueue_.Clear();
[Vu] If sndqueue_.Clear() must be called every time calling
`FlushData`, should move `Clear()` into FlushData() ?
}
@@ -109,6 +127,24 @@ uint64_t TipcPortId::GetUniqueId(struct
tipc_portid id) {
return uid;
}
+void TipcPortId::FlushData() {
+ DataMessage* msg = nullptr;
+ do {
+ // find the lowest sequence unsent yet
+ msg = sndqueue_.FirstUnsent();
+ if (msg != nullptr) {
+ Send(msg->msg_data_, msg->header_.msg_len_);
+ msg->is_sent_ = true;
+ m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ "FlushData[mseq:%u, mfrag:%u, fseq:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ }
+ } while (msg != nullptr);
+}
+
uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
struct sockaddr_tipc server_addr;
ssize_t send_len = 0;
@@ -130,29 +166,49 @@ uint32_t TipcPortId::Send(uint8_t* data,
uint16_t length) {
return rc;
}
-uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
+uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
+ bool is_sent) {
uint32_t rc = NCSCC_RC_SUCCESS;
DataMessage *msg = new DataMessage;
msg->header_.Decode(const_cast<uint8_t*>(data));
msg->Decode(const_cast<uint8_t*>(data));
msg->msg_data_ = new uint8_t[length];
+ msg->is_sent_ = is_sent;
memcpy(msg->msg_data_, data, length);
sndqueue_.Queue(msg);
- ++sndwnd_.send_;
- sndwnd_.nacked_space_ += length;
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
- "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
- "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
- id_.node, id_.ref,
- msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
length,
- sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
-
+ if (is_sent) {
+ ++sndwnd_.send_;
+ sndwnd_.nacked_space_ += length;
+ m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
length,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ } else {
+ ++sndwnd_.send_;
+ m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_,
length,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ }
return rc;
}
bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
- return true;
+ if (state_ == State::kRcvBuffOverflow) return false;
+ if (sndwnd_.nacked_space_ + sending_len < rcv_buf_size_) {
+ return true;
+ } else {
+ state_ = State::kRcvBuffOverflow;
+ m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64
+ ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_,
+ sending_len, rcv_buf_size_);
+ return false;
+ }
}
void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
@@ -297,7 +353,41 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
// remove a number @chksize messages out of sndqueue_ and
decrease
// the nacked_space_ of sender
- sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq);
+ uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq);
+ sndwnd_.nacked_space_ -= acked_bytes;
+
+ // try to send a few pending msg
+ DataMessage* msg;
[Vu] Should initialize `msg` to null here to avoid accessing
uninitialized data in below `if` case.
+ uint64_t resend_bytes = 0;
+ while (resend_bytes < acked_bytes) {
+ // find the lowest sequence unsent yet
+ msg = sndqueue_.FirstUnsent();
+ if (msg == nullptr) {
+ break;
+ } else {
+ if (resend_bytes < acked_bytes) {
+ if (Send(msg->msg_data_, msg->header_.msg_len_) ==
NCSCC_RC_SUCCESS) {
+ sndwnd_.nacked_space_ += msg->header_.msg_len_;
+ msg->is_sent_ = true;
+ resend_bytes += msg->header_.msg_len_;
+ m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ "SndQData[fseq:%u, len:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+ id_.node, id_.ref,
+ msg->header_.fseq_, msg->header_.msg_len_,
+ sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ // no more unsent message, back to kEnabled
+ if (msg == nullptr && state_ == State::kRcvBuffOverflow) {
+ state_ = State::kEnabled;
+ m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ",
+ id_.node, id_.ref);
+ }
} else {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvChkAck[fseq:%u, chunk:%u], "
@@ -322,10 +412,26 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
fseq);
return;
}
+ if (state_ == State::kRcvBuffOverflow) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvNack, ignore[fseq:%u, state:%u]",
+ id_.node, id_.ref,
+ fseq, (uint8_t)state_);
+ sndqueue_.MarkUnsentFrom(fseq);
+ return;
+ }
+ if (state_ != State::kRcvBuffOverflow) {
+ state_ = State::kRcvBuffOverflow;
+ m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ",
+ id_.node, id_.ref);
+ sndqueue_.MarkUnsentFrom(fseq);
+ }
DataMessage* msg = sndqueue_.Find(mseq, mfrag);
if (msg != nullptr) {
// Resend the msg found
- Send(msg->msg_data_, msg->header_.msg_len_);
+ if (Send(msg->msg_data_, msg->header_.msg_len_) ==
NCSCC_RC_SUCCESS) {
+ msg->is_sent_ = true;
+ }
m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
"RsndData[mseq:%u, mfrag:%u, fseq:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
@@ -346,7 +452,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t
max_txprob) {
if (state_ == State::kDisabled ||
sndwnd_.acked_ > 1 ||
rcvwnd_.rcv_ > 1) return restart_txprob;
- if (state_ == State::kTxProb) {
+ if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) {
txprob_cnt_++;
if (txprob_cnt_ >= max_txprob) {
state_ = State::kDisabled;
@@ -358,6 +464,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t
max_txprob) {
// at kDisabled state, clear all message in sndqueue_,
// receiver is at old mds version
if (state_ == State::kDisabled) {
+ FlushData();
sndqueue_.Clear();
}
diff --git a/src/mds/mds_tipc_fctrl_portid.h
b/src/mds/mds_tipc_fctrl_portid.h
index 0adb9dd..119c012 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -35,6 +35,8 @@ class MessageQueue {
uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
uint64_t Size() const { return queue_.size(); }
void Clear();
+ DataMessage* FirstUnsent();
+ void MarkUnsentFrom(uint16_t fseq);
private:
std::deque<DataMessage*> queue_;
};
@@ -73,8 +75,9 @@ class TipcPortId {
void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
bool ReceiveTmrTxProb(uint8_t max_txprob);
void ReceiveTmrChunkAck();
+ void FlushData();
uint32_t Send(uint8_t* data, uint16_t length);
- uint32_t Queue(const uint8_t* data, uint16_t length);
+ uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);
uint16_t svc_cnt_{1}; // number of service subscribed on this
portid