- Revert apart of #3151 solution, not decide PortId reset base on
fseq=1 but reset rcvwnd when getting Intro msg from known PortId.
- Check to skip invalid Nack to avoid sender mistake move to
overflow and queue all messages later but receiver don't get any
further message to send ChunkAck.
- Update tet_receiver() to poll without timeout as sender may
take long time for sendto() return due to run out of memory.
- Update tet_sender() to slow down sending if amount of message
is big and message size is big to avoid kernel kill it as memory
usage too much.
- Not return error if PortId not found in checking send queue
capable to avoid agent crash after fix #3208 if agent enable mds
flow control.
---
src/mds/apitest/mdstipc_api.c | 15 +++++--------
src/mds/mds_tipc_fctrl_intf.cc | 23 +++++--------------
src/mds/mds_tipc_fctrl_portid.cc | 38 +++++++++++++++++++-------------
3 files changed, 34 insertions(+), 42 deletions(-)
diff --git a/src/mds/apitest/mdstipc_api.c b/src/mds/apitest/mdstipc_api.c
index 3dd1a3dc0..641753d7a 100644
--- a/src/mds/apitest/mdstipc_api.c
+++ b/src/mds/apitest/mdstipc_api.c
@@ -13413,6 +13413,8 @@ void tet_sender(MDS_SVC_ID svc_id, uint32_t msg_num,
uint32_t msg_size,
" successfully\n", i);
}
}
+ if (msg_num > 65535 && msg_size > 10000)
+ usleep(1000); // Slow down to avoid reaped by OOM killer
}
free(mesg);
while (1) {
@@ -13459,7 +13461,7 @@ int tet_receiver(MDS_SVC_ID svc_id, uint32_t msg_num,
sel.fd = m_GET_FD_FROM_SEL_OBJ(gl_tet_adest.svc[0].sel_obj);
sel.events = POLLIN;
while (1) {
- int ret = osaf_poll(&sel, 1, 10000);
+ int ret = osaf_poll(&sel, 1, 1000);
if (ret > 0) {
gl_rcvdmsginfo.msg = NULL;
if (mds_service_retrieve(gl_tet_adest.mds_pwe1_hdl,
@@ -13486,19 +13488,12 @@ int tet_receiver(MDS_SVC_ID svc_id, uint32_t msg_num,
}
free(msg);
}
- } else {
+ } else if (verify_counters(msg_num)) {
+ printf("\nReceiver: get enough %d messages\n", msg_num);
break;
}
}
- printf("\nReceiver verify number of received messages\n");
- if (!verify_counters(msg_num)) {
- printf("\nReceiver: Not get enough %d messages\n", msg_num);
- free(expected_buff);
- reset_counters();
- return 1;
- }
-
printf("\nEnd Receiver (pid:%d) svc_id=%d\n",
(int)getpid(), svc_id);
free(expected_buff);
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 93bfce51c..348605c67 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -351,26 +351,17 @@ uint32_t mds_tipc_fctrl_sndqueue_capable(struct
tipc_portid id,
uint16_t* next_seq) {
if (is_fctrl_enabled == false) return NCSCC_RC_SUCCESS;
- uint32_t rc = NCSCC_RC_SUCCESS;
-
portid_map_mutex.lock();
TipcPortId *portid = portid_lookup(id);
- if (portid == nullptr) {
- m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
- "[line:%u], Error[PortId not found]",
- id.node, id.ref, __LINE__);
- rc = NCSCC_RC_FAILURE;
- } else {
- if (portid->state_ != TipcPortId::State::kDisabled) {
+ if (portid && portid->state_ != TipcPortId::State::kDisabled) {
// assign the sequence number of the outgoing message
*next_seq = portid->GetCurrentSeq();
- }
}
portid_map_mutex.unlock();
- return rc;
+ return NCSCC_RC_SUCCESS;
}
uint32_t mds_tipc_fctrl_trysend(struct tipc_portid id, const uint8_t *buffer,
@@ -564,12 +555,10 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer,
uint16_t len,
// no need to decode intro message
// the decoding intro message type is done in header decoding
// send to the event thread
- pevt = new Event(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0);
- if (m_NCS_IPC_SEND(&mbx_events, pevt,
- NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
- m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events, Error[%s]",
- strerror(errno));
- }
+ portid_map_mutex.lock();
+ Event evt(Event::Type::kEvtRcvIntro, id, 0, 0, 0, 0);
+ process_flow_event(evt);
+ portid_map_mutex.unlock();
} else {
m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
"[msg_type:%u], Error[not supported message type]",
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 41fce3df8..f569e1f99 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -373,10 +373,10 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
if (rcvwnd_.rcv_ + 1 < Seq16(fseq)) {
if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
// peer does not realize that this portid reset
- m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
"RcvData[mseq:%u, mfrag:%u, fseq:%u], "
"rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
- "Warning[portid reset]",
+ "[portid reset]",
id_.node, id_.ref,
mseq, mfrag, fseq,
rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
@@ -397,19 +397,6 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t
mfrag,
// send nack
SendNack((rcvwnd_.rcv_ + 1).v(), svc_id);
}
- } else if (fseq == 1) {
- // sender realize me as portid reset
- m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
- "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
- "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
- "Warning[portid reset on sender]",
- id_.node, id_.ref,
- mseq, mfrag, fseq,
- rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
-
- SendChunkAck(fseq, svc_id, 1);
- rcvwnd_.rcv_ = Seq16(fseq);
- rcvwnd_.acked_ = rcvwnd_.rcv_;
} else if (Seq16(fseq) <= rcvwnd_.rcv_) {
rc = NCSCC_RC_FAILURE;
// unexpected retransmission
@@ -509,6 +496,17 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
fseq);
return;
}
+
+ if (Seq16(fseq) <= sndwnd_.acked_) {
+ m_MDS_LOG_NOTIFY("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "RcvNack[fseq:%u], "
+ "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+ "Warning[Invalid Nack]",
+ id_.node, id_.ref, fseq,
+ sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+ return;
+ }
+
if (state_ == State::kRcvBuffOverflow) {
sndqueue_.MarkUnsentFrom(Seq16(fseq));
if (Seq16(fseq) - sndwnd_.acked_ > 1) {
@@ -606,6 +604,16 @@ void TipcPortId::ReceiveIntro() {
if (state_ == State::kStartup || state_ == State::kTxProb) {
ChangeState(State::kEnabled);
}
+ if (rcvwnd_.rcv_ > Seq16(0)) {
+ // sender realize me as portid reset
+ m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+ "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+ "[portid reset on sender]",
+ id_.node, id_.ref,
+ rcvwnd_.acked_.v(), rcvwnd_.rcv_.v(), rcvwnd_.nacked_space_);
+ rcvwnd_.rcv_ = Seq16(0);
+ rcvwnd_.acked_ = rcvwnd_.rcv_;
+ }
}
} // end namespace mds
--
2.17.1
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel