Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
Hi Thuan, ack with comments. Thanks Minh On 28/11/19 6:55 pm, thuan.tran wrote: When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 47 ++-- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 316e1ba75..d5314d5bc 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -149,23 +150,24 @@ void TipcPortId::FlushData() { uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { struct sockaddr_tipc server_addr; - ssize_t send_len = 0; - uint32_t rc = NCSCC_RC_SUCCESS; - memset(_addr, 0, sizeof(server_addr)); server_addr.family = AF_TIPC; server_addr.addrtype = TIPC_ADDR_ID; server_addr.addr.id = id_; - send_len = sendto(bsrsock_, data, length, 0, -(struct sockaddr *)_addr, sizeof(server_addr)); - - if (send_len == length) { -rc = NCSCC_RC_SUCCESS; - } else { -m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); -rc = NCSCC_RC_FAILURE; + int retry = 5; + while (retry >= 0) { +ssize_t send_len = sendto(bsrsock_, data, length, 0, + (struct sockaddr *)_addr, sizeof(server_addr)); + +if (send_len == length) { + return NCSCC_RC_SUCCESS; +} else if (retry-- > 0) { + assert(errno == ENOMEM || errno == ENOBUFS); + osaf_nanosleep(); +} } [Minh] It might be a good thing to make a wrapper of sendto(), since the sendto() is currently called in fctrl_portid.cc and mds_dt_tipc.c. So we only call the wrapper of sendto(), which handles the error code of sendto(). I think the only EINTR code to be checked, there are a few places in opensaf that is handling error code of sendto() which we can take as reference. - return rc; + m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); + return NCSCC_RC_FAILURE; } uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, @@ -440,13 +442,16 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; -while (send_msg_cnt++ < chunk_size_) { +int retry = 0; +while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { +retry = 0; +send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -454,6 +459,12 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { id_.node, id_.ref, msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); + } else if (send_msg_cnt == 0) { +// If not retry, all messages are kept in queue +// and no more trigger to send messages +retry++; +assert(retry < 100); +continue; [Minh] We can accept to use the assert for now, and 100 should be defined as constant. But I do think we need a fallback mechanism, if the socket fd is not able to send data, we can terminate the portid, and trigger a MDS_DOWN event, ... and this could be looked in another ticket. Also, the patch title does not seem to be right in the context of this ticket, where we have problem of "Cannot allocate memeory", we might not be able to send any more message (not that for all) and hit the assert. We can say "Add retry for tipc sendto()" or you have a better description for it. } else { break; } @@ -508,9 +519,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; +int retry = 0; +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + retry++; + assert(retry < 100); + continue; } +msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
[devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 47 ++-- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 316e1ba75..d5314d5bc 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -149,23 +150,24 @@ void TipcPortId::FlushData() { uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { struct sockaddr_tipc server_addr; - ssize_t send_len = 0; - uint32_t rc = NCSCC_RC_SUCCESS; - memset(_addr, 0, sizeof(server_addr)); server_addr.family = AF_TIPC; server_addr.addrtype = TIPC_ADDR_ID; server_addr.addr.id = id_; - send_len = sendto(bsrsock_, data, length, 0, -(struct sockaddr *)_addr, sizeof(server_addr)); - - if (send_len == length) { -rc = NCSCC_RC_SUCCESS; - } else { -m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); -rc = NCSCC_RC_FAILURE; + int retry = 5; + while (retry >= 0) { +ssize_t send_len = sendto(bsrsock_, data, length, 0, + (struct sockaddr *)_addr, sizeof(server_addr)); + +if (send_len == length) { + return NCSCC_RC_SUCCESS; +} else if (retry-- > 0) { + assert(errno == ENOMEM || errno == ENOBUFS); + osaf_nanosleep(); +} } - return rc; + m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); + return NCSCC_RC_FAILURE; } uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, @@ -440,13 +442,16 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; -while (send_msg_cnt++ < chunk_size_) { +int retry = 0; +while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { +retry = 0; +send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -454,6 +459,12 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { id_.node, id_.ref, msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); + } else if (send_msg_cnt == 0) { +// If not retry, all messages are kept in queue +// and no more trigger to send messages +retry++; +assert(retry < 100); +continue; } else { break; } @@ -508,9 +519,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; +int retry = 0; +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + retry++; + assert(retry < 100); + continue; } +msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", -- 2.17.1 ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel
Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
Hi Vu, Thanks. See my reply inline. Best Regards, ThuanTr -Original Message- From: Nguyen Minh Vu Sent: Thursday, November 28, 2019 10:36 AM To: thuan.tran ; 'Minh Hon Chau' ; 'thang . d . nguyen' ; gary@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123] Hi Thuan, Ack with comments inline. Regards, Vu On 11/27/19 6:33 PM, thuan.tran wrote: > When overflow happens, mds with flow control enabled may keep > all messages in queue if it fails to send a message when receiving > Nack or ChunkAck since no more trigger come after that. > MDS flow control should retry to send message in this scenario. > --- > src/mds/mds_tipc_fctrl_portid.cc | 39 +++- > 1 file changed, 23 insertions(+), 16 deletions(-) > > diff --git a/src/mds/mds_tipc_fctrl_portid.cc > b/src/mds/mds_tipc_fctrl_portid.cc > index 316e1ba75..8081e8bd4 100644 > --- a/src/mds/mds_tipc_fctrl_portid.cc > +++ b/src/mds/mds_tipc_fctrl_portid.cc > @@ -17,6 +17,7 @@ > > #include "mds/mds_tipc_fctrl_portid.h" > #include "base/ncssysf_def.h" > +#include "base/osaf_time.h" > > #include "mds/mds_dt.h" > #include "mds/mds_log.h" > @@ -149,23 +150,23 @@ void TipcPortId::FlushData() { > > uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { > struct sockaddr_tipc server_addr; > - ssize_t send_len = 0; > - uint32_t rc = NCSCC_RC_SUCCESS; > - > memset(_addr, 0, sizeof(server_addr)); > server_addr.family = AF_TIPC; > server_addr.addrtype = TIPC_ADDR_ID; > server_addr.addr.id = id_; > - send_len = sendto(bsrsock_, data, length, 0, > -(struct sockaddr *)_addr, sizeof(server_addr)); > - > - if (send_len == length) { > -rc = NCSCC_RC_SUCCESS; > - } else { > -m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); > -rc = NCSCC_RC_FAILURE; > + int retry = 5; > + while (retry >= 0) { > +ssize_t send_len = sendto(bsrsock_, data, length, 0, > + (struct sockaddr *)_addr, sizeof(server_addr)); > + [Vu] Any case the sendto just sends a part of data? if so, the retry if any should not start from the beginning of data. the below code shows what i meant: ssize_t byte_sent = 0; while (retry--) { ssize_t send_len = sendto(bsrsock_, data + byte_sent, length - byte_sent, 0, (struct sockaddr *)_addr, sizeof(server_addr)); if (send_lenn == -1) { // error handling here if (errno == EINTR) continue; // error, can't continue. should log something here? return NCSCC_RC_FAILURE; // or assert? } // number of bytes sent byte_sent += send_data; if (byte_sent >= length) { return NCSCC_RC_SUCCESS; } // retry but do we need to sleep here? osaf_nanosleep(); } [Thuan]: I think there is no case send a part of message. Even if yes, the incomplete message is not accept by receiver. Receiver don't have reassemble for unfragmented message. > +if (send_len == length) { > + return NCSCC_RC_SUCCESS; > +} else if (retry-- > 0) { > + osaf_nanosleep(); > +} > } > - return rc; > + m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); > + return NCSCC_RC_FAILURE; > } > > uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, > @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, > uint16_t chksize) { > // try to send a few pending msg > DataMessage* msg = nullptr; > uint16_t send_msg_cnt = 0; > -while (send_msg_cnt++ < chunk_size_) { > +while (send_msg_cnt < chunk_size_) { > // find the lowest sequence unsent yet > msg = sndqueue_.FirstUnsent(); > if (msg == nullptr) { > break; > } else { > if (Send(msg->msg_data_, msg->header_.msg_len_) == > NCSCC_RC_SUCCESS) { > +send_msg_cnt++; > msg->is_sent_ = true; > m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "SndQData[fseq:%u, len:%u], " > @@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t > chksize) { > msg->header_.fseq_, msg->header_.msg_len_, > sndwnd_.acked_.v(), sndwnd_.send_.v(), > sndwnd_.nacked_space_); > } else { > -break; > +// If not retry, all messages are kept in queue > +// and no more trigger to send messages > +continue; [Vu] If send is constantly failed, this loop has no way to exit? [Thuan] Yes > } > } > } > @@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t > mfrag, > DataMessage* msg = sndqueue_.Find(Seq16(fseq)); > if (msg != nullptr) { > // Resend the msg found > -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { > - msg->is_sent_ = true; > +while (Send(msg->msg_data_,
Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
Hi Thuan, Ack with comments inline. Regards, Vu On 11/27/19 6:33 PM, thuan.tran wrote: When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 39 +++- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 316e1ba75..8081e8bd4 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -149,23 +150,23 @@ void TipcPortId::FlushData() { uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { struct sockaddr_tipc server_addr; - ssize_t send_len = 0; - uint32_t rc = NCSCC_RC_SUCCESS; - memset(_addr, 0, sizeof(server_addr)); server_addr.family = AF_TIPC; server_addr.addrtype = TIPC_ADDR_ID; server_addr.addr.id = id_; - send_len = sendto(bsrsock_, data, length, 0, -(struct sockaddr *)_addr, sizeof(server_addr)); - - if (send_len == length) { -rc = NCSCC_RC_SUCCESS; - } else { -m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); -rc = NCSCC_RC_FAILURE; + int retry = 5; + while (retry >= 0) { +ssize_t send_len = sendto(bsrsock_, data, length, 0, + (struct sockaddr *)_addr, sizeof(server_addr)); + [Vu] Any case the sendto just sends a part of data? if so, the retry if any should not start from the beginning of data. the below code shows what i meant: ssize_t byte_sent = 0; while (retry--) { ssize_t send_len = sendto(bsrsock_, data + byte_sent, length - byte_sent, 0, (struct sockaddr *)_addr, sizeof(server_addr)); if (send_lenn == -1) { // error handling here if (errno == EINTR) continue; // error, can't continue. should log something here? return NCSCC_RC_FAILURE; // or assert? } // number of bytes sent byte_sent += send_data; if (byte_sent >= length) { return NCSCC_RC_SUCCESS; } // retry but do we need to sleep here? osaf_nanosleep(); } +if (send_len == length) { + return NCSCC_RC_SUCCESS; +} else if (retry-- > 0) { + osaf_nanosleep(); +} } - return rc; + m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); + return NCSCC_RC_FAILURE; } uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; -while (send_msg_cnt++ < chunk_size_) { +while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { +send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { -break; +// If not retry, all messages are kept in queue +// and no more trigger to send messages +continue; [Vu] If send is constantly failed, this loop has no way to exit? } } } @@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + continue; [Vu] If send is constantly failed, this loop has no way to exit? } +msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel
[devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 39 +++- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 316e1ba75..8081e8bd4 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -149,23 +150,23 @@ void TipcPortId::FlushData() { uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { struct sockaddr_tipc server_addr; - ssize_t send_len = 0; - uint32_t rc = NCSCC_RC_SUCCESS; - memset(_addr, 0, sizeof(server_addr)); server_addr.family = AF_TIPC; server_addr.addrtype = TIPC_ADDR_ID; server_addr.addr.id = id_; - send_len = sendto(bsrsock_, data, length, 0, -(struct sockaddr *)_addr, sizeof(server_addr)); - - if (send_len == length) { -rc = NCSCC_RC_SUCCESS; - } else { -m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); -rc = NCSCC_RC_FAILURE; + int retry = 5; + while (retry >= 0) { +ssize_t send_len = sendto(bsrsock_, data, length, 0, + (struct sockaddr *)_addr, sizeof(server_addr)); + +if (send_len == length) { + return NCSCC_RC_SUCCESS; +} else if (retry-- > 0) { + osaf_nanosleep(); +} } - return rc; + m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno)); + return NCSCC_RC_FAILURE; } uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length, @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; -while (send_msg_cnt++ < chunk_size_) { +while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { +send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { -break; +// If not retry, all messages are kept in queue +// and no more trigger to send messages +continue; } } } @@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + continue; } +msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", -- 2.17.1 ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel
Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
But after all retries are still failed, we might need to terminate the portid, which leads to a MDS DOWN event, but let's look at it later. On 27/11/19 3:23 pm, Minh Hon Chau wrote: Hi Thuan, I'm thinking to retry 3 times with 100 ms in between, but you can decide it. Also, we need to ensure not to make the mds main receiving thread being blocked with the retry (on the flow of processing data). The retry in this patch is ok since it retries on the mds flow control thread, so it does not delay the mds main receiving thread. Thanks Minh On 27/11/19 2:40 pm, Tran Thuan wrote: Hi Minh, I think it's good if retry some times for normal Send(). Do you have any idea how many retries? Interval b/w tries? Best Regards, ThuanTr -Original Message- From: Minh Hon Chau Sent: Wednesday, November 27, 2019 10:30 AM To: thuan.tran ; thang . d . nguyen ; 'Nguyen Minh Vu' ; gary@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123] Hi Thuan, The TipcPortId:Send is also called at a few other places, do you think it is good if we make a wrapper of TipcPortId::Send with a few retries on failures, says TipcPortId::TryToSend(), and call TryToSend() instead of Send()? Thanks Minh On 27/11/19 1:26 pm, thuan.tran wrote: When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 16 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 724eb7b7b..e6e179669 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; - while (send_msg_cnt++ < chunk_size_) { + while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { + send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { - break; + // If not retry, all messages are kept in queue + // and no more trigger to send messages + osaf_nanosleep(); + continue; } } } @@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found - if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; + while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + osaf_nanosleep(); } + msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel
Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
Hi Thuan, I'm thinking to retry 3 times with 100 ms in between, but you can decide it. Also, we need to ensure not to make the mds main receiving thread being blocked with the retry (on the flow of processing data). The retry in this patch is ok since it retries on the mds flow control thread, so it does not delay the mds main receiving thread. Thanks Minh On 27/11/19 2:40 pm, Tran Thuan wrote: Hi Minh, I think it's good if retry some times for normal Send(). Do you have any idea how many retries? Interval b/w tries? Best Regards, ThuanTr -Original Message- From: Minh Hon Chau Sent: Wednesday, November 27, 2019 10:30 AM To: thuan.tran ; thang . d . nguyen ; 'Nguyen Minh Vu' ; gary@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123] Hi Thuan, The TipcPortId:Send is also called at a few other places, do you think it is good if we make a wrapper of TipcPortId::Send with a few retries on failures, says TipcPortId::TryToSend(), and call TryToSend() instead of Send()? Thanks Minh On 27/11/19 1:26 pm, thuan.tran wrote: When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 16 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 724eb7b7b..e6e179669 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; -while (send_msg_cnt++ < chunk_size_) { +while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { +send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { -break; +// If not retry, all messages are kept in queue +// and no more trigger to send messages +osaf_nanosleep(); +continue; } } } @@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + osaf_nanosleep(); } +msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel
Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
Hi Minh, I think it's good if retry some times for normal Send(). Do you have any idea how many retries? Interval b/w tries? Best Regards, ThuanTr -Original Message- From: Minh Hon Chau Sent: Wednesday, November 27, 2019 10:30 AM To: thuan.tran ; thang . d . nguyen ; 'Nguyen Minh Vu' ; gary@dektech.com.au Cc: opensaf-devel@lists.sourceforge.net Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123] Hi Thuan, The TipcPortId:Send is also called at a few other places, do you think it is good if we make a wrapper of TipcPortId::Send with a few retries on failures, says TipcPortId::TryToSend(), and call TryToSend() instead of Send()? Thanks Minh On 27/11/19 1:26 pm, thuan.tran wrote: > When overflow happens, mds with flow control enabled may keep > all messages in queue if it fails to send a message when receiving > Nack or ChunkAck since no more trigger come after that. > MDS flow control should retry to send message in this scenario. > --- > src/mds/mds_tipc_fctrl_portid.cc | 16 > 1 file changed, 12 insertions(+), 4 deletions(-) > > diff --git a/src/mds/mds_tipc_fctrl_portid.cc > b/src/mds/mds_tipc_fctrl_portid.cc > index 724eb7b7b..e6e179669 100644 > --- a/src/mds/mds_tipc_fctrl_portid.cc > +++ b/src/mds/mds_tipc_fctrl_portid.cc > @@ -17,6 +17,7 @@ > > #include "mds/mds_tipc_fctrl_portid.h" > #include "base/ncssysf_def.h" > +#include "base/osaf_time.h" > > #include "mds/mds_dt.h" > #include "mds/mds_log.h" > @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, > uint16_t chksize) { > // try to send a few pending msg > DataMessage* msg = nullptr; > uint16_t send_msg_cnt = 0; > -while (send_msg_cnt++ < chunk_size_) { > +while (send_msg_cnt < chunk_size_) { > // find the lowest sequence unsent yet > msg = sndqueue_.FirstUnsent(); > if (msg == nullptr) { > break; > } else { > if (Send(msg->msg_data_, msg->header_.msg_len_) == > NCSCC_RC_SUCCESS) { > +send_msg_cnt++; > msg->is_sent_ = true; > m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "SndQData[fseq:%u, len:%u], " > @@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t > chksize) { > msg->header_.fseq_, msg->header_.msg_len_, > sndwnd_.acked_.v(), sndwnd_.send_.v(), > sndwnd_.nacked_space_); > } else { > -break; > +// If not retry, all messages are kept in queue > +// and no more trigger to send messages > +osaf_nanosleep(); > +continue; > } > } > } > @@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t > mfrag, > DataMessage* msg = sndqueue_.Find(Seq16(fseq)); > if (msg != nullptr) { > // Resend the msg found > -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { > - msg->is_sent_ = true; > +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { > + // If not retry, all messages are kept in queue > + // and no more trigger to send messages > + osaf_nanosleep(); > } > +msg->is_sent_ = true; > m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " > "RsndData[mseq:%u, mfrag:%u, fseq:%u], " > "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel
Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
Hi Thuan, The TipcPortId:Send is also called at a few other places, do you think it is good if we make a wrapper of TipcPortId::Send with a few retries on failures, says TipcPortId::TryToSend(), and call TryToSend() instead of Send()? Thanks Minh On 27/11/19 1:26 pm, thuan.tran wrote: When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 16 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 724eb7b7b..e6e179669 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; -while (send_msg_cnt++ < chunk_size_) { +while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { +send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { -break; +// If not retry, all messages are kept in queue +// and no more trigger to send messages +osaf_nanosleep(); +continue; } } } @@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + osaf_nanosleep(); } +msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel
[devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]
When overflow happens, mds with flow control enabled may keep all messages in queue if it fails to send a message when receiving Nack or ChunkAck since no more trigger come after that. MDS flow control should retry to send message in this scenario. --- src/mds/mds_tipc_fctrl_portid.cc | 16 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc index 724eb7b7b..e6e179669 100644 --- a/src/mds/mds_tipc_fctrl_portid.cc +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -17,6 +17,7 @@ #include "mds/mds_tipc_fctrl_portid.h" #include "base/ncssysf_def.h" +#include "base/osaf_time.h" #include "mds/mds_dt.h" #include "mds/mds_log.h" @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { // try to send a few pending msg DataMessage* msg = nullptr; uint16_t send_msg_cnt = 0; -while (send_msg_cnt++ < chunk_size_) { +while (send_msg_cnt < chunk_size_) { // find the lowest sequence unsent yet msg = sndqueue_.FirstUnsent(); if (msg == nullptr) { break; } else { if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { +send_msg_cnt++; msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "SndQData[fseq:%u, len:%u], " @@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { msg->header_.fseq_, msg->header_.msg_len_, sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_); } else { -break; +// If not retry, all messages are kept in queue +// and no more trigger to send messages +osaf_nanosleep(); +continue; } } } @@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, DataMessage* msg = sndqueue_.Find(Seq16(fseq)); if (msg != nullptr) { // Resend the msg found -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) { - msg->is_sent_ = true; +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) { + // If not retry, all messages are kept in queue + // and no more trigger to send messages + osaf_nanosleep(); } +msg->is_sent_ = true; m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], " "RsndData[mseq:%u, mfrag:%u, fseq:%u], " "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", -- 2.17.1 ___ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel