This patch implements the kRcvBuffOverflow state machine as described in README file. --- src/mds/mds_tipc_fctrl_intf.cc | 6 +- src/mds/mds_tipc_fctrl_msg.h | 1 + src/mds/mds_tipc_fctrl_portid.cc | 137 ++++++++++++++++++++++++++++++++++----- src/mds/mds_tipc_fctrl_portid.h | 5 +- 4 files changed, 131 insertions(+), 18 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc index c2d0922..397114e 100644 --- a/src/mds/mds_tipc_fctrl_intf.cc +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -285,14 +285,16 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, rc = NCSCC_RC_FAILURE; } else { if (portid->state_ != TipcPortId::State::kDisabled) { - portid->Queue(buffer, len); + bool sendable = portid->ReceiveCapable(len); + portid->Queue(buffer, len, sendable); // start txprob timer for the first msg sent out // do not start for other states - if (portid->state_ == TipcPortId::State::kStartup) { + if (sendable && portid->state_ == TipcPortId::State::kStartup) { txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk); m_MDS_LOG_DBG("FCTRL: Start txprob"); portid->state_ = TipcPortId::State::kTxProb; } + if (sendable == false) rc = NCSCC_RC_FAILURE; } } diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h index 69f8048..e6b9662 100644 --- a/src/mds/mds_tipc_fctrl_msg.h +++ b/src/mds/mds_tipc_fctrl_msg.h @@ -110,6 +110,7 @@ class DataMessage: public BaseMessage { uint8_t* msg_data_{nullptr}; uint8_t snd_type_{0}; + bool is_sent_{true}; DataMessage() {} virtual ~DataMessage(); void Decode(uint8_t *msg) override; diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 84ecee9..e762290 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -82,6 +82,23 @@ uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { return msg_len; } +DataMessage* MessageQueue::FirstUnsent() { + for (auto it = queue_.begin(); it != queue_.end(); ++it) { + DataMessage *m = *it; + if (m->is_sent_ == false) { + return m; + } + } + return nullptr; +} + +void MessageQueue::MarkUnsentFrom(uint16_t fseq) { + for (auto it = queue_.begin(); it != queue_.end(); ++it) { + DataMessage *m = *it; + if (m->header_.fseq_ >= fseq) m->is_sent_ = false; + } +} + void MessageQueue::Clear() { while (queue_.empty() == false) { DataMessage* msg = queue_.front(); @@ -99,7 +116,8 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize, TipcPortId::~TipcPortId() { // Fake a TmrChunkAck event to ack all received messages ReceiveTmrChunkAck(); - // clear all msg in sndqueue_ + // flush all unsent msg in sndqueue_ + FlushData(); sndqueue_.Clear(); } @@ -109,6 +127,24 @@ uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) { return uid; } +void TipcPortId::FlushData() { + DataMessage* msg = nullptr; + do { + // find the lowest sequence unsent yet + msg = sndqueue_.FirstUnsent(); + if (msg != nullptr) { + Send(msg->msg_data_, msg->header_.msg_len_); + msg->is_sent_ = true; + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + "FlushData[mseq:%u, mfrag:%u, fseq:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + } + } while (msg != nullptr); +} + uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { struct sockaddr_tipc server_addr; ssize_t send_len = 0; @@ -130,29 +166,49 @@ uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { return rc; } -uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) { +uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, + bool is_sent) { uint32_t rc = NCSCC_RC_SUCCESS; DataMessage *msg = new DataMessage; msg->header_.Decode(const_cast<uint8_t*>(data)); msg->Decode(const_cast<uint8_t*>(data)); msg->msg_data_ = new uint8_t[length]; + msg->is_sent_ = is_sent; memcpy(msg->msg_data_, data, length); sndqueue_.Queue(msg); - ++sndwnd_.send_; - sndwnd_.nacked_space_ += length; - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " - "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " - "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", - id_.node, id_.ref, - msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, - sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); - + if (is_sent) { + ++sndwnd_.send_; + sndwnd_.nacked_space_ += length; + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + } else { + ++sndwnd_.send_; + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + } return rc; } bool TipcPortId::ReceiveCapable(uint16_t sending_len) { - return true; + if (state_ == State::kRcvBuffOverflow) return false; + if (sndwnd_.nacked_space_ + sending_len < rcv_buf_size_) { + return true; + } else { + state_ = State::kRcvBuffOverflow; + m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow, %" PRIu64 + ", %u, %" PRIu64, id_.node, id_.ref, sndwnd_.nacked_space_, + sending_len, rcv_buf_size_); + return false; + } } void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id, @@ -297,7 +353,41 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // remove a number @chksize messages out of sndqueue_ and decrease // the nacked_space_ of sender - sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq); + uint64_t acked_bytes = sndqueue_.Erase(fseq - chksize + 1, fseq); + sndwnd_.nacked_space_ -= acked_bytes; + + // try to send a few pending msg + DataMessage* msg; + uint64_t resend_bytes = 0; + while (resend_bytes < acked_bytes) { + // find the lowest sequence unsent yet + msg = sndqueue_.FirstUnsent(); + if (msg == nullptr) { + break; + } else { + if (resend_bytes < acked_bytes) { + if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { + sndwnd_.nacked_space_ += msg->header_.msg_len_; + msg->is_sent_ = true; + resend_bytes += msg->header_.msg_len_; + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + "SndQData[fseq:%u, len:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + msg->header_.fseq_, msg->header_.msg_len_, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + } + } else { + break; + } + } + } + // no more unsent message, back to kEnabled + if (msg == nullptr && state_ == State::kRcvBuffOverflow) { + state_ = State::kEnabled; + m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] Overflow --> Enabled ", + id_.node, id_.ref); + } } else { m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvChkAck[fseq:%u, chunk:%u], " @@ -322,10 +412,26 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, fseq); return; } + if (state_ == State::kRcvBuffOverflow) { + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvNack, ignore[fseq:%u, state:%u]", + id_.node, id_.ref, + fseq, (uint8_t)state_); + sndqueue_.MarkUnsentFrom(fseq); + return; + } + if (state_ != State::kRcvBuffOverflow) { + state_ = State::kRcvBuffOverflow; + m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u] --> Overflow ", + id_.node, id_.ref); + sndqueue_.MarkUnsentFrom(fseq); + } DataMessage* msg = sndqueue_.Find(mseq, mfrag); if (msg != nullptr) { // Resend the msg found - Send(msg->msg_data_, msg->header_.msg_len_); + if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { + msg->is_sent_ = true; + } m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", @@ -346,7 +452,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) { if (state_ == State::kDisabled || sndwnd_.acked_ > 1 || rcvwnd_.rcv_ > 1) return restart_txprob; - if (state_ == State::kTxProb) { + if (state_ == State::kTxProb || state_ == State::kRcvBuffOverflow) { txprob_cnt_++; if (txprob_cnt_ >= max_txprob) { state_ = State::kDisabled; @@ -358,6 +464,7 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) { // at kDisabled state, clear all message in sndqueue_, // receiver is at old mds version if (state_ == State::kDisabled) { + FlushData(); sndqueue_.Clear(); } diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h index 0adb9dd..119c012 100644 --- a/src/mds/mds_tipc_fctrl_portid.h +++ b/src/mds/mds_tipc_fctrl_portid.h @@ -35,6 +35,8 @@ class MessageQueue { uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to); uint64_t Size() const { return queue_.size(); } void Clear(); + DataMessage* FirstUnsent(); + void MarkUnsentFrom(uint16_t fseq); private: std::deque<DataMessage*> queue_; }; @@ -73,8 +75,9 @@ class TipcPortId { void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); bool ReceiveTmrTxProb(uint8_t max_txprob); void ReceiveTmrChunkAck(); + void FlushData(); uint32_t Send(uint8_t* data, uint16_t length); - uint32_t Queue(const uint8_t* data, uint16_t length); + uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent); uint16_t svc_cnt_{1}; // number of service subscribed on this portid -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel