This patch adds state machine to support tx probation timer. --- src/mds/mds_tipc_fctrl_intf.cc | 47 +++++++++++++++-- src/mds/mds_tipc_fctrl_msg.h | 1 + src/mds/mds_tipc_fctrl_portid.cc | 109 +++++++++++++++++++++++++++++++++++++++ src/mds/mds_tipc_fctrl_portid.h | 22 ++++++++ 4 files changed, 176 insertions(+), 3 deletions(-)
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc index bd0a8f6..c2d0922 100644 --- a/src/mds/mds_tipc_fctrl_intf.cc +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -34,6 +34,7 @@ using mds::Event; using mds::TipcPortId; +using mds::Timer; using mds::DataMessage; using mds::ChunkAck; using mds::HeaderMessage; @@ -65,6 +66,11 @@ uint64_t sock_buf_size = 0; std::map<uint64_t, TipcPortId*> portid_map; std::mutex portid_map_mutex; +// probation timer event to enable flow control at receivers +const int64_t kBaseTimerInt = 200; // in centisecond +const uint8_t kTxProbMaxRetries = 10; +Timer txprob_timer(Event::Type::kEvtTmrTxProb); + // chunk ack parameters // todo: The chunk ack timeout and chunk ack size should be configurable int kChunkAckTimeout = 1000; // in miliseconds @@ -76,13 +82,37 @@ TipcPortId* portid_lookup(struct tipc_portid id) { return portid_map[uid]; } +void tmr_exp_cbk(void* uarg) { + Timer* timer = reinterpret_cast<Timer*>(uarg); + if (timer != nullptr) { + timer->is_active_ = false; + // send to fctrl thread + if (m_NCS_IPC_SEND(&mbx_events, new Event(timer->type_), + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); + } + } +} + void process_timer_event(const Event evt) { + bool txprob_restart = false; for (auto i : portid_map) { TipcPortId* portid = i.second; + + if (evt.type_ == Event::Type::kEvtTmrTxProb) { + if (portid->ReceiveTmrTxProb(kTxProbMaxRetries) == true) { + txprob_restart = true; + } + } + if (evt.type_ == Event::Type::kEvtTmrChunkAck) { portid->ReceiveTmrChunkAck(); } } + if (txprob_restart) { + txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk); + m_MDS_LOG_DBG("FCTRL: Restart txprob"); + } } uint32_t process_flow_event(const Event evt) { @@ -231,8 +261,10 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len, id.node, id.ref, __LINE__); rc = NCSCC_RC_FAILURE; } else { - // assign the sequence number of the outgoing message - *next_seq = portid->GetCurrentSeq(); + if (portid->state_ != TipcPortId::State::kDisabled) { + // assign the sequence number of the outgoing message + *next_seq = portid->GetCurrentSeq(); + } } portid_map_mutex.unlock(); @@ -252,7 +284,16 @@ uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, id.node, id.ref, __LINE__); rc = NCSCC_RC_FAILURE; } else { - portid->Queue(buffer, len); + if (portid->state_ != TipcPortId::State::kDisabled) { + portid->Queue(buffer, len); + // start txprob timer for the first msg sent out + // do not start for other states + if (portid->state_ == TipcPortId::State::kStartup) { + txprob_timer.Start(kBaseTimerInt, tmr_exp_cbk); + m_MDS_LOG_DBG("FCTRL: Start txprob"); + portid->state_ = TipcPortId::State::kTxProb; + } + } } portid_map_mutex.unlock(); diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h index 8e6a874..69f8048 100644 --- a/src/mds/mds_tipc_fctrl_msg.h +++ b/src/mds/mds_tipc_fctrl_msg.h @@ -45,6 +45,7 @@ class Event { kEvtDropData, // event reported from tipc that a message is not // delivered kEvtTmrAll, + kEvtTmrTxProb, // event that tx probation timer expired for once kEvtTmrChunkAck, // event to send the chunk ack }; NCS_IPC_MSG next_{0}; diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 64115d5..84ecee9 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -23,6 +23,35 @@ namespace mds { +Timer::Timer(Event::Type type) { + tmr_id_ = nullptr; + type_ = type; + is_active_ = false; +} + +Timer::~Timer() { +} + +void Timer::Start(int64_t period, void (*tmr_exp_func)(void*)) { + // timer will not start if it's already started + // period is in centiseconds + if (is_active_ == false) { + if (tmr_id_ == nullptr) { + tmr_id_ = ncs_tmr_alloc(nullptr, 0); + } + tmr_id_ = ncs_tmr_start(tmr_id_, period, tmr_exp_func, this, + nullptr, 0); + is_active_ = true; + } +} + +void Timer::Stop() { + if (is_active_ == true) { + ncs_tmr_stop(tmr_id_); + is_active_ = false; + } +} + void MessageQueue::Queue(DataMessage* msg) { queue_.push_back(msg); } @@ -64,6 +93,7 @@ void MessageQueue::Clear() { TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize, uint64_t sock_buf_size): id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) { + state_ = State::kStartup; } TipcPortId::~TipcPortId() { @@ -144,6 +174,23 @@ void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id, uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, uint16_t fseq, uint16_t svc_id) { uint32_t rc = NCSCC_RC_SUCCESS; + if (state_ == State::kDisabled) { + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData, TxProb[retries:%u, state:%u], " + "Error[receive fseq:%u in invalid state]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_, + fseq); + return rc; + } + // update state + if (state_ == State::kTxProb || state_ == State::kStartup) { + state_ = State::kEnabled; + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData, TxProb[retries:%u, state:%u]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_); + } // update receiver sequence window if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -159,6 +206,12 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, SendChunkAck(fseq, svc_id, chunk_size_); rcvwnd_.acked_ = rcvwnd_.rcv_; rc = NCSCC_RC_CONTINUE; + } else if (fseq == 1 && rcvwnd_.acked_ == 0) { + // send ack right away for the very first data message + // to stop txprob timer at sender + SendChunkAck(fseq, svc_id, 1); + rcvwnd_.acked_ = rcvwnd_.rcv_; + rc = NCSCC_RC_CONTINUE; } } else { // todo: update rcvwnd_.nacked_space_. @@ -210,6 +263,24 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, } void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { + if (state_ == State::kDisabled) { + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData, TxProb[retries:%u, state:%u], " + "Error[receive fseq:%u in invalid state]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_, + fseq); + return; + } + // update state + if (state_ == State::kTxProb) { + state_ = State::kEnabled; + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvChkAck, " + "TxProb[retries:%u, state:%u]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_); + } // update sender sequence window if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) { m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -242,6 +313,15 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq) { + if (state_ == State::kDisabled) { + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvNack, TxProb[retries:%u, state:%u], " + "Error[receive fseq:%u in invalid state]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_, + fseq); + return; + } DataMessage* msg = sndqueue_.Find(mseq, mfrag); if (msg != nullptr) { // Resend the msg found @@ -261,7 +341,36 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, } } +bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) { + bool restart_txprob = false; + if (state_ == State::kDisabled || + sndwnd_.acked_ > 1 || + rcvwnd_.rcv_ > 1) return restart_txprob; + if (state_ == State::kTxProb) { + txprob_cnt_++; + if (txprob_cnt_ >= max_txprob) { + state_ = State::kDisabled; + restart_txprob = false; + } else { + restart_txprob = true; + } + + // at kDisabled state, clear all message in sndqueue_, + // receiver is at old mds version + if (state_ == State::kDisabled) { + sndqueue_.Clear(); + } + + m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], " + "TxProbExp, TxProb[retries:%u, state:%u]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_); + } + return restart_txprob; +} + void TipcPortId::ReceiveTmrChunkAck() { + if (state_ == State::kDisabled) return; uint16_t chksize = rcvwnd_.rcv_ - rcvwnd_.acked_; if (chksize > 0) { m_MDS_LOG_DBG("FCTRL: [node:%x, ref:%u], " diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h index 99beaaf..0adb9dd 100644 --- a/src/mds/mds_tipc_fctrl_portid.h +++ b/src/mds/mds_tipc_fctrl_portid.h @@ -39,8 +39,26 @@ class MessageQueue { std::deque<DataMessage*> queue_; }; +class Timer { + public: + tmr_t tmr_id_{nullptr}; + bool is_active_{false}; + Event::Type type_; + void Start(int64_t period, void (*tmr_exp_func)(void*)); + void Stop(); + explicit Timer(Event::Type type); + ~Timer(); +}; + class TipcPortId { public: + enum class State { + kDisabled, // no flow control support for this published portid + kStartup, // a newly published portid starts at this state + kTxProb, // txprob timer runs to confirm the flow control support + kEnabled, // flow control support is confirmed + kRcvBuffOverflow // the receiver's buffer overflow + }; TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size, uint64_t sock_buf_size); ~TipcPortId(); @@ -53,12 +71,16 @@ class TipcPortId { uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, uint16_t fseq, uint16_t svc_id); void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); + bool ReceiveTmrTxProb(uint8_t max_txprob); void ReceiveTmrChunkAck(); uint32_t Send(uint8_t* data, uint16_t length); uint32_t Queue(const uint8_t* data, uint16_t length); uint16_t svc_cnt_{1}; // number of service subscribed on this portid + State state_; + uint8_t txprob_cnt_{0}; + private: struct tipc_portid id_; int bsrsock_; // tipc socket to send/receive data per tipc_portid -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel