Hi bro.Minh,
- In my understanding, tx probation timer only start when sender send
first message.
Then sender relies on chunkAck to know receiver support MDS FCTRL or
timeout as not support.
But from what you describe, sender got tx probation timer timeout
before sending first message?
Or after sending first message but sender cannot get any chunkAck somehow?
I am confused this point. Could you help explain?
- About the code, mistake set '0' twice for .acked_ in
TipcPortId::ReceiveReset()
Best Regards,
ThuanTr
-----Original Message-----
From: Minh Chau <minh.c...@dektech.com.au>
Sent: Friday, October 11, 2019 10:52 AM
To: hans.nordeb...@ericsson.com; gary....@dektech.com.au;
vu.m.ngu...@dektech.com.au; thuan.t...@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net; Minh Chau
<minh.c...@dektech.com.au>
Subject: [PATCH 1/1] mds: Add Reset message [#3090]
mds relies on data message sent from the peer to determine whether the
MDS_TIPC_FCTRL_ENABLED is set. The data message may not be sent right
after TIPC_PUBLISHED event, which can cause the tx probation timer timeout.
This patch add Reset message, which is sent right after the
TIPC_PUBLISHED to help mds determine the flow control supported at the peer
earlier.
---
src/mds/mds_main.c | 2 +-
src/mds/mds_tipc_fctrl_intf.cc | 27 ++++++++++++++++++++++
src/mds/mds_tipc_fctrl_msg.cc | 11 +++++++++
src/mds/mds_tipc_fctrl_msg.h | 18 +++++++++++++++
src/mds/mds_tipc_fctrl_portid.cc | 49
++++++++++++++++++++++++++++++----------
src/mds/mds_tipc_fctrl_portid.h | 2 ++
6 files changed, 96 insertions(+), 13 deletions(-)
diff --git a/src/mds/mds_main.c b/src/mds/mds_main.c index
8c9b1f1..c7d2f7b
100644
--- a/src/mds/mds_main.c
+++ b/src/mds/mds_main.c
@@ -408,7 +408,7 @@ uint32_t mds_lib_req(NCS_LIB_REQ_INFO *req)
if (tipc_mcast_enabled != false)
tipc_mcast_enabled = true;
- m_MDS_LOG_DBG(
+ m_MDS_LOG_NOTIFY(
"MDS: TIPC_MCAST_ENABLED: %d Set argument
\n",
tipc_mcast_enabled);
}
diff --git a/src/mds/mds_tipc_fctrl_intf.cc
b/src/mds/mds_tipc_fctrl_intf.cc index 6271890..e8c9121 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -39,6 +39,7 @@ using mds::DataMessage; using mds::ChunkAck; using
mds::HeaderMessage; using mds::Nack;
+using mds::Reset;
namespace {
// flow control enabled/disabled
@@ -124,12 +125,20 @@ uint32_t process_flow_event(const Event& evt) {
uint32_t rc = NCSCC_RC_SUCCESS;
TipcPortId *portid = portid_lookup(evt.id_);
if (portid == nullptr) {
+ // the null portid normally should not happen; however because the
+ // tipc_cb.Dsock and tipc_cb.BSRsock are separated; the data message
+ // sent from BSRsock may come before reception of TIPC_PUBLISHED
if (evt.type_ == Event::Type::kEvtRcvData) {
portid = new TipcPortId(evt.id_, data_sock_fd,
kChunkAckSize, sock_buf_size);
portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
evt.fseq_, evt.svc_id_);
+ } else if (evt.type_ == Event::Type::kEvtRcvReset) {
+ portid = new TipcPortId(evt.id_, data_sock_fd,
+ kChunkAckSize, sock_buf_size);
+ portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+ portid->ReceiveReset();
} else {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvEvt[evt:%d], Error[PortId not found]", @@ -151,6
+160,9 @@ uint32_t process_flow_event(const Event& evt) {
portid->ReceiveNack(evt.mseq_, evt.mfrag_,
evt.fseq_);
}
+ if (evt.type_ == Event::Type::kEvtRcvReset) {
+ portid->ReceiveReset();
+ }
}
return rc;
}
@@ -489,6 +501,16 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
strerror(errno));
}
+ } else if (header.msg_type_ == Reset::kResetMsgType) {
+ // no need to decode reset message
+ // the decoding reset message type is done in header decoding
+ // send to the event thread
+ if (m_NCS_IPC_SEND(&mbx_events,
+ new Event(Event::Type::kEvtRcvReset, id, 0, 0, 0, 0),
+ NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+ m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events,
Error[%s]",
+ strerror(errno));
+ }
} else {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
"[msg_type:%u], Error[not supported message type]", @@
-516,6
+538,11 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t
+len,
portid_map_mutex.unlock();
return rc;
}
+ } else {
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "Receive non-flow-control data message, "
+ "header.pro_ver:%u",
+ id.node, id.ref, header.pro_ver_);
}
return NCSCC_RC_SUCCESS;
}
diff --git a/src/mds/mds_tipc_fctrl_msg.cc
b/src/mds/mds_tipc_fctrl_msg.cc index 932120f..4aba3fb 100644
--- a/src/mds/mds_tipc_fctrl_msg.cc
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -178,4 +178,15 @@ void Nack::Decode(uint8_t *msg) {
nacked_fseq_ = ncs_decode_16bit(&ptr); }
+
+void Reset::Encode(uint8_t *msg) {
+ uint8_t *ptr;
+ // encode protocol identifier
+ ptr = &msg[Reset::FieldIndex::kProtocolIdentifier];
+ ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+ // encode message type
+ ptr = &msg[Reset::FieldIndex::kFlowControlMessageType];
+ ncs_encode_8bit(&ptr, kResetMsgType); }
+
} // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h
b/src/mds/mds_tipc_fctrl_msg.h index e1db200..e94fb9d 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -45,6 +45,7 @@ class Event {
kEvtDropData, // event reported from tipc that a message is
not
// delivered
kEvtRcvNack, // event that received nack message
+ kEvtRcvReset, // event that received reset message
kEvtTmrAll,
kEvtTmrTxProb, // event that tx probation timer expired for once
kEvtTmrChunkAck, // event to send the chunk ack @@ -172,6
+173,23 @@ class Nack: public BaseMessage {
void Decode(uint8_t *msg) override;
};
+class Reset: public BaseMessage {
+ public:
+ enum FieldIndex {
+ kProtocolIdentifier = 11,
+ kFlowControlMessageType = 15,
+ };
+ static const uint8_t kResetMsgType = 3;
+ static const uint16_t kResetMsgLength = 16;
+
+ uint8_t msg_type_{0};
+
+ Reset() { msg_type_ = kResetMsgType; }
+ virtual ~Reset() {}
+ void Encode(uint8_t *msg) override; };
+
+
} // end namespace mds
#endif // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git
a/src/mds/mds_tipc_fctrl_portid.cc
b/src/mds/mds_tipc_fctrl_portid.cc
index 24a7e2a..f195b01 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -112,6 +112,7 @@ TipcPortId::TipcPortId(struct tipc_portid id, int
sock, uint16_t chksize,
uint64_t sock_buf_size):
id_(id), bsrsock_(sock), chunk_size_(chksize),
rcv_buf_size_(sock_buf_size) {
state_ = State::kStartup;
+ SendReset();
}
TipcPortId::~TipcPortId() {
@@ -189,7 +190,7 @@ uint32_t TipcPortId::Queue(const uint8_t* data,
uint16_t length,
sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
++sndwnd_.send_;
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"QueData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
@@ -248,10 +249,22 @@ void TipcPortId::SendNack(uint16_t fseq,
uint16_t
svc_id) {
Nack nack(svc_id, fseq);
nack.Encode(data);
Send(data, Nack::kNackMsgLength);
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"SndNack[fseq:%u]", id_.node, id_.ref, fseq); }
+void TipcPortId::SendReset() {
+ uint8_t data[Reset::kResetMsgLength] = {0};
+
+ HeaderMessage header(Reset::kResetMsgLength, 0, 0, 0);
+ header.Encode(data);
+
+ Reset reset;
+ reset.Encode(data);
+ Send(data, Reset::kResetMsgLength);
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
+ "SndReset", id_.node, id_.ref); }
uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id) { @@ -330,8 +343,7 @@ uint32_t
TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
// send nack
SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
}
- }
- if (Seq16(fseq) <= rcvwnd_.rcv_) {
+ } else if (Seq16(fseq) <= rcvwnd_.rcv_) {
rc = NCSCC_RC_FAILURE;
// unexpected retransmission
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
@@ -399,7 +411,7 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq,
uint16_t
chksize) {
sndwnd_.nacked_space_ += msg->header_.msg_len_;
msg->is_sent_ = true;
resend_bytes += msg->header_.msg_len_;
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"SndQData[fseq:%u, len:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
@@ -443,7 +455,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
}
if (state_ == State::kRcvBuffOverflow) {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
- "RcvNack[fseq:%u, state:%u]"
+ "RcvNack[fseq:%u, state:%u], "
"Warning[Ignore Nack]",
id_.node, id_.ref,
fseq, (uint8_t)state_);
@@ -462,7 +474,7 @@ void TipcPortId::ReceiveNack(uint32_t mseq,
uint16_t mfrag,
if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
msg->is_sent_ = true;
}
- m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
"RsndData[mseq:%u, mfrag:%u, fseq:%u], "
"sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
id_.node, id_.ref,
@@ -495,12 +507,11 @@ bool TipcPortId::ReceiveTmrTxProb(uint8_t
max_txprob) {
// receiver is at old mds version
if (state_ == State::kDisabled) {
FlushData();
+ m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
+ "TxProbExp, TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
}
-
- m_MDS_LOG_NOTIFY("FCTRL: [node:%x, ref:%u], "
- "TxProbExp, TxProb[retries:%u, state:%u]",
- id_.node, id_.ref,
- txprob_cnt_, (uint8_t)state_);
}
return restart_txprob;
}
@@ -518,4 +529,18 @@ void TipcPortId::ReceiveTmrChunkAck() {
}
}
+void TipcPortId::ReceiveReset() {
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvReset, "
+ "TxProb[retries:%u, state:%u]",
+ id_.node, id_.ref,
+ txprob_cnt_, (uint8_t)state_);
+ if (state_ == State::kStartup || state_ == State::kTxProb) {
+ state_ = State::kEnabled;
+ }
+ rcvwnd_.acked_ = 0;
+ rcvwnd_.rcv_ = 0;
+ rcvwnd_.acked_ = 0;
+}
+
} // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h
b/src/mds/mds_tipc_fctrl_portid.h index d238ac6..ffed2ad 100644
--- a/src/mds/mds_tipc_fctrl_portid.h
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -134,11 +134,13 @@ class TipcPortId {
void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
void SendNack(uint16_t fseq, uint16_t svc_id);
+ void SendReset();
uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
uint16_t fseq, uint16_t svc_id);
void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
bool ReceiveTmrTxProb(uint8_t max_txprob);
void ReceiveTmrChunkAck();
+ void ReceiveReset();
void FlushData();
uint32_t Send(uint8_t* data, uint16_t length);
uint32_t Queue(const uint8_t* data, uint16_t length, bool
is_sent);
--
2.7.4