mds relies on data message sent from the peer to determine whether the MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right after TIPC_PUBLISHED event, which can cause the tx probation timer timeout.
This patch add Reset message, which is sent right after the TIPC_PUBLISHED to help mds determine the flow control supported at the peer earlier. --- src/mds/mds_main.c | 2 +- src/mds/mds_tipc_fctrl_intf.cc | 27 ++++++++++++++++++++++ src/mds/mds_tipc_fctrl_msg.cc | 11 +++++++++ src/mds/mds_tipc_fctrl_msg.h | 18 +++++++++++++++ src/mds/mds_tipc_fctrl_portid.cc | 49 ++++++++++++++++++++++++++++++---------- src/mds/mds_tipc_fctrl_portid.h | 2 ++ 6 files changed, 96 insertions(+), 13 deletions(-) diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index 8c9b1f1..c7d2f7b 100644 --- a/src/mds/mds_main.c +++ b/src/mds/mds_main.c @@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req) if (tipc_mcast_enabled != false) tipc_mcast_enabled = true; - m_MDS_LOG_DBG( + m_MDS_LOG_NOTIFY( "MDS: TIPC_MCAST_ENABLED: %d Set argument \n", tipc_mcast_enabled); } diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..e8c9121 100644 --- a/src/mds/mds_tipc_fctrl_intf.cc +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -39,6 +39,7 @@ using mds::DataMessage; using mds::ChunkAck; using mds::HeaderMessage; using mds::Nack; +using mds::Reset; namespace { // flow control enabled/disabled @@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) { uint32_t rc = NCSCC_RC_SUCCESS; TipcPortId *portid = portid_lookup(evt.id_); if (portid == nullptr) { + // the null portid normally should not happen; however because the + // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message + // sent from BSRsock may come before reception of TIPC_PUBLISHED if (evt.type_ == Event::Type::kEvtRcvData) { portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize, sock_buf_size); portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, evt.fseq_, evt.svc_id_); + } else if (evt.type_ == Event::Type::kEvtRcvReset) { + portid = new TipcPortId(evt.id_, data_sock_fd, + kChunkAckSize, sock_buf_size); + portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; + portid->ReceiveReset(); } else { m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " "RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6 +160,9 @@ uint32_t process_flow_event(const Event& evt) { portid->ReceiveNack(evt.mseq_, evt.mfrag_, evt.fseq_); } + if (evt.type_ == Event::Type::kEvtRcvReset) { + portid->ReceiveReset(); + } } return rc; } @@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", strerror(errno)); } + } else if (header.msg_type_ == Reset::kResetMsgType) { + // no need to decode reset message + // the decoding reset message type is done in header decoding + // send to the event thread + if (m_NCS_IPC_SEND(&mbx_events, + new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0), + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]", + strerror(errno)); + } } else { m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " "[msg_type:%u], Error[not supported message type]", @@ -516,6 +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, portid_map_mutex.unlock(); return rc; } + } else { + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " + "Receive non-flow-control data message, " + "header.pro_ver:%u", + id.node, id.ref, header.pro_ver_); } return NCSCC_RC_SUCCESS; } diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..4aba3fb 100644 --- a/src/mds/mds_tipc_fctrl_msg.cc +++ b/src/mds/mds_tipc_fctrl_msg.cc @@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) { nacked_fseq_ = ncs_decode_16bit(&ptr); } + +void Reset::Encode(uint8_t *msg) { + uint8_t *ptr; + // encode protocol identifier + ptr = &msg[Reset::FieldIndex::kProtocolIdentifier]; + ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID); + // encode message type + ptr = &msg[Reset::FieldIndex::kFlowControlMessageType]; + ncs_encode_8bit(&ptr, kResetMsgType); +} + } // end namespace mds diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h index e1db200..e94fb9d 100644 --- a/src/mds/mds_tipc_fctrl_msg.h +++ b/src/mds/mds_tipc_fctrl_msg.h @@ -45,6 +45,7 @@ class Event { kEvtDropData, // event reported from tipc that a message is not // delivered kEvtRcvNack, // event that received nack message + kEvtRcvReset, // event that received reset message kEvtTmrAll, kEvtTmrTxProb, // event that tx probation timer expired for once kEvtTmrChunkAck, // event to send the chunk ack @@ -172,6 +173,23 @@ class Nack: public BaseMessage { void Decode(uint8_t *msg) override; }; +class Reset: public BaseMessage { + public: + enum FieldIndex { + kProtocolIdentifier = 11, + kFlowControlMessageType = 15, + }; + static const uint8_t kResetMsgType = 3; + static const uint16_t kResetMsgLength = 16; + + uint8_t msg_type_{0}; + + Reset() { msg_type_ = kResetMsgType; } + virtual ~Reset() {} + void Encode(uint8_t *msg) override; +}; + + } // end namespace mds #endif // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 24a7e2a..f195b01 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize, uint64_t sock_buf_size): id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) { state_ = State::kStartup; + SendReset(); } TipcPortId::~TipcPortId() { @@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { ++sndwnd_.send_; - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, @@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, uint16_t svc_id) { Nack nack(svc_id, fseq); nack.Encode(data); Send(data, Nack::kNackMsgLength); - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndNack[fseq:%u]", id_.node, id_.ref, fseq); } +void TipcPortId::SendReset() { + uint8_t data[Reset::kResetMsgLength] = {0}; + + HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0); + header.Encode(data); + + Reset reset; + reset.Encode(data); + Send(data, Reset::kResetMsgLength); + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " + "SndReset", id_.node, id_.ref); +} uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, // send nack SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id); } - } - if (Seq16(fseq) <= rcvwnd_.rcv_) { + } else if (Seq16(fseq) <= rcvwnd_.rcv_) { rc = NCSCC_RC_FAILURE; // unexpected retransmission m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " @@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { sndwnd_.nacked_space_ += msg->header_.msg_len_; msg->is_sent_ = true; resend_bytes += msg->header_.msg_len_; - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, @@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, } if (state_ == State::kRcvBuffOverflow) { m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " - "RcvNack[fseq:%u, state:%u]" + "RcvNack[fseq:%u, state:%u], " "Warning[Ignore Nack]", id_.node, id_.ref, fseq, (uint8_t)state_); @@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { msg->is_sent_ = true; } - m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", id_.node, id_.ref, @@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) { // receiver is at old mds version if (state_ == State::kDisabled) { FlushData(); + m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], " + "TxProbExp, TxProb[retries:%u, state:%u]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_); } - - m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], " - "TxProbExp, TxProb[retries:%u, state:%u]", - id_.node, id_.ref, - txprob_cnt_, (uint8_t)state_); } return restart_txprob; } @@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() { } } +void TipcPortId::ReceiveReset() { + m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvReset, " + "TxProb[retries:%u, state:%u]", + id_.node, id_.ref, + txprob_cnt_, (uint8_t)state_); + if (state_ == State::kStartup || state_ == State::kTxProb) { + state_ = State::kEnabled; + } + rcvwnd_.acked_ = 0; + rcvwnd_.rcv_ = 0; + rcvwnd_.acked_ = 0; +} + } // end namespace mds diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..ffed2ad 100644 --- a/src/mds/mds_tipc_fctrl_portid.h +++ b/src/mds/mds_tipc_fctrl_portid.h @@ -134,11 +134,13 @@ class TipcPortId { void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size); void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size); void SendNack(uint16_t fseq, uint16_t svc_id); + void SendReset(); uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, uint16_t fseq, uint16_t svc_id); void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); bool ReceiveTmrTxProb(uint8_t max_txprob); void ReceiveTmrChunkAck(); + void ReceiveReset(); void FlushData(); uint32_t Send(uint8_t* data, uint16_t length); uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent); -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel