mds relies on data message sent from the peer to determine
whether the MDS_TIPC_FCTRL_ENABLED is set. The data message
may not be sent right after TIPC_PUBLISHED event, which can
cause the tx probation timer timeout.
This patch add Intro message, which is sent right after the
TIPC_PUBLISHED to help mds determine the flow control supported
at the peer earlier.
---
src/mds/mds_main.c | 2 +-
src/mds/mds_tipc_fctrl_intf.cc | 27 ++++++++++++++++++++++
src/mds/mds_tipc_fctrl_msg.cc | 11 +++++++++
src/mds/mds_tipc_fctrl_msg.h | 18 +++++++++++++++
src/mds/mds_tipc_fctrl_portid.cc | 49 ++++++++++++++++++++++++++++++----------
src/mds/mds_tipc_fctrl_portid.h | 2 ++
6 files changed, 96 insertions(+), 13 deletions(-)
diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c
index 8c9b1f1..c7d2f7b 100644
--- a/src/mds/mds_main.c
+++ b/src/mds/mds_main.c
@@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
if (tipc_mcast_enabled != false)
tipc_mcast_enabled = true;
- m_MDS_LOG_DBG(
+ m_MDS_LOG_NOTIFY(
"MDS: TIPC_MCAST_ENABLED: %d Set argument
\n",
tipc_mcast_enabled);
}
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 6271890..b803bfe 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -39,6 +39,7 @@ using mds::DataMessage;
using mds::ChunkAck;
using mds::HeaderMessage;
using mds::Nack;
+using mds::Intro;
namespace {
// flow control enabled/disabled
@@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
uint32_t rc = NCSCC_RC_SUCCESS;
TipcPortId *portid = portid_lookup(evt.id_);
if (portid == nullptr) {
+ // the null portid normally should not happen; however because the
+ // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
+ // sent from BSRsock may come before reception of TIPC_PUBLISHED
if (evt.type_ == Event::Type::kEvtRcvData) {
portid = new TipcPortId(evt.id_, data_sock_fd,
kChunkAckSize, sock_buf_size);
portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
evt.fseq_, evt.svc_id_);
+ } else if (evt.type_ == Event::Type::kEvtRcvIntro) {
+ portid = new TipcPortId(evt.id_, data_sock_fd,
+ kChunkAckSize, sock_buf_size);
+ portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+ portid->ReceiveIntro();
} else {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvEvt[evt:%d], Error[PortId not found]",
@@ -151,6 +160,9 @@ uint32_t process_flow_event(const Event& evt) {
portid->ReceiveNack(evt.mseq_, evt.mfrag_,
evt.fseq_);
}
+ if (evt.type_ == Event::Type::kEvtRcvIntro) {
+ portid->ReceiveIntro();
+ }
}
return rc;
}
@@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t
len,
m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
strerror(errno));
}
+ } else if (header.msg_type_ == Intro::kIntroMsgType) {
+ // no need to decode intro message
+ // the decoding intro message type is done in header decoding
+ // send to the event thread
+ if (m_NCS_IPC_SEND(&mbx_events,
+ new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0),
+ NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
+ strerror(errno));
+ }
} else {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
"[msg_type:%u], Error[not supported message type]",
@@ -516,6 +538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t
len,
portid_map_mutex.unlock();
return rc;
}
+ } else {
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "Receive non-flow-control data message, "
+ "header.pro_ver:%u",
+ id.node, id.ref, header.pro_ver_);
}
return NCSCC_RC_SUCCESS;
}
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
index 932120f..180dcb6 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
nacked_fseq_ = ncs_decode_16bit(&ptr);
}
+
+void Intro::Encode(uint8_t *msg) {
+ uint8_t *ptr;
+ // encode protocol identifier
+ ptr = &msg[Intro::FieldIndex::kProtocolIdentifier];
+ ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+ // encode message type
+ ptr = &msg[Intro::FieldIndex::kFlowControlMessageType];
+ ncs_encode_8bit(&ptr, kIntroMsgType);
+}
+
} // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index e1db200..3e45fa6 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
kEvtDropData, // event reported from tipc that a message is not
// delivered
kEvtRcvNack, // event that received nack message
+ kEvtRcvIntro, // event that received intro message
kEvtTmrAll,
kEvtTmrTxProb, // event that tx probation timer expired for once
kEvtTmrChunkAck, // event to send the chunk ack
@@ -172,6 +173,23 @@ class Nack: public BaseMessage {
void Decode(uint8_t *msg) override;
};
+class Intro: public BaseMessage {
+ public:
+ enum FieldIndex {
+ kProtocolIdentifier = 11,
+ kFlowControlMessageType = 15,
+ };
+ static const uint8_t kIntroMsgType = 3;
+ static const uint16_t kIntroMsgLength = 16;
+
+ uint8_t msg_type_{0};
+
+ Intro() { msg_type_ = kIntroMsgType; }
+ virtual ~Intro() {}
+ void Encode(uint8_t *msg) override;
+};
+
+
} // end namespace mds
#endif // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 24a7e2a..5b882c9 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock,
uint16_t chksize,
uint64_t sock_buf_size):
id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size)
{
state_ = State::kStartup;
+ SendIntro();
}
TipcPortId::~TipcPortId() {
@@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t
length,
sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
++sndwnd_.send_;
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
@@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq, uint16_t svc_id)
{
Nack nack(svc_id, fseq);
nack.Encode(data);
Send(data, Nack::kNackMsgLength);
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"SndNack[fseq:%u]", id_.node, id_.ref, fseq);
}
+void TipcPortId::SendIntro() {
+ uint8_t data[Intro::kIntroMsgLength] = {0};
+
+ HeaderMessage header(Intro::kIntroMsgLength, 0, 0, 0);
+ header.Encode(data);
+
+ Intro intro;
+ intro.Encode(data);
+ Send(data, Intro::kIntroMsgLength);
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+ "SndIntro", id_.node, id_.ref);
+}
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id) {
@@ -330,8 +343,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
// send nack
SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
}
- }
- if (Seq16(fseq) <= rcvwnd_.rcv_) {
+ } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
rc = NCSCC_RC_FAILURE;
// unexpected retransmission
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t
chksize) {
sndwnd_.nacked_space_ += msg->header_.msg_len_;
msg->is_sent_ = true;
resend_bytes += msg->header_.msg_len_;
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"SndQData[fseq:%u, len:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
@@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
}
if (state_ == State::kRcvBuffOverflow) {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
- "RcvNack[fseq:%u, state:%u]"
+ "RcvNack[fseq:%u, state:%u], "
"Warning[Ignore Nack]",
id_.node, id_.ref,
fseq, (uint8_t)state_);
@@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
msg->is_sent_ = true;
}
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"RsndData[mseq:%u, mfrag:%u, fseq:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
@@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t max_txprob) {
// receiver is at old mds version
if (state_ == State::kDisabled) {
FlushData();
+ m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
+ "TxProbExp, TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
}
-
- m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
- "TxProbExp, TxProb[retries:%u, state:%u]",
- id_.node, id_.ref,
- txprob_cnt_, (uint8_t)state_);
}
return restart_txprob;
}
@@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
}
}
+void TipcPortId::ReceiveIntro() {
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvIntro, "
+ "TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ if (state_ == State::kStartup || state_ == State::kTxProb) {
+ state_ = State::kEnabled;
+ }
+ rcvwnd_.acked_ = 0;
+ rcvwnd_.rcv_ = 0;
+ rcvwnd_.acked_ = 0;
+}
+
} // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
index d238ac6..bb569f1 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -134,11 +134,13 @@ class TipcPortId {
void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
void SendNack(uint16_t fseq, uint16_t svc_id);
+ void SendIntro();
uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id);
void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
bool ReceiveTmrTxProb(uint8_t max_txprob);
void ReceiveTmrChunkAck();
+ void ReceiveIntro();
void FlushData();
uint32_t Send(uint8_t* data, uint16_t length);
uint32_t Queue(const uint8_t* data, uint16_t length, bool is_sent);