Agent enable flow control keep add new portid without remove.
Remove portid when svc count become zero then handle portid reset
properly, peer A may see portid reset (peer B) then peer B should
accept fseq(1) message from peer A.
---
src/mds/mds_tipc_fctrl_intf.cc | 6 ++++++
src/mds/mds_tipc_fctrl_portid.cc | 17 ++++++++++++++++-
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index f3883ba36..f3504b901 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -428,6 +428,12 @@ uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id,
uint32_t type) {
portid->svc_cnt_--;
m_MDS_LOG_DBG("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u",
id.node, id.ref, svc_id, portid->svc_cnt_);
+ if (portid->svc_cnt_ == 0) {
+ delete portid;
+ portid_map.erase(TipcPortId::GetUniqueId(id));
+ m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]",
+ id.node, id.ref);
+ }
}
portid_map_mutex.unlock();
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 3562c4a00..57843b6de 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -373,7 +373,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
if (rcvwnd_.rcv_ + Seq16(1) < Seq16(fseq)) {
if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
// peer does not realize that this portid reset
- m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvData[mseq:%u, mfrag:%u, fseq:%u], "
"rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
"Warning[portid reset]",
@@ -381,7 +381,9 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
mseq, mfrag, fseq,
rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+ SendChunkAck(fseq, svc_id, 1);
rcvwnd_.rcv_ = fseq;
+ rcvwnd_.acked_ = rcvwnd_.rcv_;
} else {
rc = NCSCC_RC_FAILURE;
// msg loss
@@ -395,6 +397,19 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
// send nack
SendNack((rcvwnd_.rcv_ + Seq16(1)).v(), svc_id);
}
+ } else if (fseq == 1) {
+ // sender realize me as portid reset
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+ "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+ "Warning[portid reset on sender]",
+ id_.node, id_.ref,
+ mseq, mfrag, fseq,
+ rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+
+ SendChunkAck(fseq, svc_id, 1);
+ rcvwnd_.rcv_ = fseq;
+ rcvwnd_.acked_ = rcvwnd_.rcv_;
} else if (Seq16(fseq) <= rcvwnd_.rcv_) {
rc = NCSCC_RC_FAILURE;
// unexpected retransmission