+ if (is_active_ == true) {
+ ncs_tmr_stop(tmr_id_);
+ is_active_ = false;
+ }
+}
+
void MessageQueue::Queue(DataMessage* msg) {
queue_.push_back(msg);
}
@@ -64,6 +93,7 @@ void MessageQueue::Clear() {
TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t
chksize,
uint64_t sock_buf_size):
id_(id), bsrsock_(sock), chunk_size_(chksize),
rcv_buf_size_(sock_buf_size) {
+ state_ = State::kStartup;
}
TipcPortId::~TipcPortId() {
@@ -144,6 +174,23 @@ void TipcPortId::SendChunkAck(uint16_t fseq,
uint16_t svc_id,
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id) {
uint32_t rc = NCSCC_RC_SUCCESS;
+ if (state_ == State::kDisabled) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData, TxProb[retries:%u, state:%u], "
+ "Error[receive fseq:%u in invalid state]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_,
+ fseq);
+ return rc;
+ }
+ // update state
+ if (state_ == State::kTxProb || state_ == State::kStartup) {
+ state_ = State::kEnabled;
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData, TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ }
// update receiver sequence window
if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -159,6 +206,12 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
SendChunkAck(fseq, svc_id, chunk_size_);
rcvwnd_.acked_ = rcvwnd_.rcv_;
rc = NCSCC_RC_CONTINUE;
+ } else if (fseq == 1 && rcvwnd_.acked_ == 0) {
+ // send ack right away for the very first data message
+ // to stop txprob timer at sender
+ SendChunkAck(fseq, svc_id, 1);
+ rcvwnd_.acked_ = rcvwnd_.rcv_;
+ rc = NCSCC_RC_CONTINUE;
}
} else {
// todo: update rcvwnd_.nacked_space_.
@@ -210,6 +263,24 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq,
uint16_t mfrag,
}
void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
+ if (state_ == State::kDisabled) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData, TxProb[retries:%u, state:%u], "
+ "Error[receive fseq:%u in invalid state]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_,
+ fseq);
+ return;
+ }
+ // update state
+ if (state_ == State::kTxProb) {
+ state_ = State::kEnabled;
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvChkAck, "
+ "TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ }
// update sender sequence window
if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -242,6 +313,15 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t chksize) {
void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
uint16_t fseq) {
+ if (state_ == State::kDisabled) {
+ m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvNack, TxProb[retries:%u, state:%u], "
+ "Error[receive fseq:%u in invalid state]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_,
+ fseq);
+ return;
+ }
DataMessage* msg = sndqueue_.Find(mseq, mfrag);
if (msg != nullptr) {
// Resend the msg found
@@ -261,7 +341,36 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
}
}
+bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
+ bool restart_txprob = false;
+ if (state_ == State::kDisabled ||
+ sndwnd_.acked_ > 1 ||
+ rcvwnd_.rcv_ > 1) return restart_txprob;
+ if (state_ == State::kTxProb) {
+ txprob_cnt_++;
+ if (txprob_cnt_ >= max_txprob) {
+ state_ = State::kDisabled;
+ restart_txprob = false;
+ } else {
+ restart_txprob = true;
+ }
+
+ // at kDisabled state, clear all message in sndqueue_,
+ // receiver is at old mds version
+ if (state_ == State::kDisabled) {
+ sndqueue_.Clear();
+ }
+
+ m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
+ "TxProbExp, TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ }
+ return restart_txprob;
+}
+
void TipcPortId::ReceiveTmrChunkAck() {
+ if (state_ == State::kDisabled) return;
uint16_t chksize = rcvwnd_.rcv_ - rcvwnd_.acked_;
if (chksize > 0) {
m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], "
diff --git a/src/mds/mds_tipc_fctrl_portid.h
b/src/mds/mds_tipc_fctrl_portid.h
index 99beaaf..0adb9dd 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -39,8 +39,26 @@ class MessageQueue {
std::deque<DataMessage*> queue_;
};
+class Timer {
+ public:
+ tmr_t tmr_id_{nullptr};
+ bool is_active_{false};
+ Event::Type type_;
+ void Start(int64_t period, void (*tmr_exp_func)(void*));
+ void Stop();
+ explicit Timer(Event::Type type);
+ ~Timer();
+};
+
class TipcPortId {
public:
+ enum class State {
+ kDisabled, // no flow control support for this published portid
+ kStartup, // a newly published portid starts at this state
+ kTxProb, // txprob timer runs to confirm the flow control
support
+ kEnabled, // flow control support is confirmed
+ kRcvBuffOverflow // the receiver's buffer overflow
+ };
TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
uint64_t sock_buf_size);
~TipcPortId();
@@ -53,12 +71,16 @@ class TipcPortId {
uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id);
void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
+ bool ReceiveTmrTxProb(uint8_t max_txprob);
void ReceiveTmrChunkAck();
uint32_t Send(uint8_t* data, uint16_t length);
uint32_t Queue(const uint8_t* data, uint16_t length);
uint16_t svc_cnt_{1}; // number of service subscribed on this
portid
+ State state_;
+ uint8_t txprob_cnt_{0};
+
private:
struct tipc_portid id_;
int bsrsock_; // tipc socket to send/receive data per tipc_portid