Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-28 Thread Minh Hon Chau

Hi Thuan,

ack with comments.

Thanks

Minh

On 28/11/19 6:55 pm, thuan.tran wrote:

When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
  src/mds/mds_tipc_fctrl_portid.cc | 47 ++--
  1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 316e1ba75..d5314d5bc 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
  
  #include "mds/mds_tipc_fctrl_portid.h"

  #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
  
  #include "mds/mds_dt.h"

  #include "mds/mds_log.h"
@@ -149,23 +150,24 @@ void TipcPortId::FlushData() {
  
  uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {

struct sockaddr_tipc server_addr;
-  ssize_t send_len = 0;
-  uint32_t rc = NCSCC_RC_SUCCESS;
-
memset(_addr, 0, sizeof(server_addr));
server_addr.family = AF_TIPC;
server_addr.addrtype = TIPC_ADDR_ID;
server_addr.addr.id = id_;
-  send_len = sendto(bsrsock_, data, length, 0,
-(struct sockaddr *)_addr, sizeof(server_addr));
-
-  if (send_len == length) {
-rc = NCSCC_RC_SUCCESS;
-  } else {
-m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
-rc = NCSCC_RC_FAILURE;
+  int retry = 5;
+  while (retry >= 0) {
+ssize_t send_len = sendto(bsrsock_, data, length, 0,
+  (struct sockaddr *)_addr, sizeof(server_addr));
+
+if (send_len == length) {
+  return NCSCC_RC_SUCCESS;
+} else if (retry-- > 0) {
+  assert(errno == ENOMEM || errno == ENOBUFS);
+  osaf_nanosleep();
+}
}
[Minh] It might be a good thing to make a wrapper of sendto(), since the 
sendto() is currently called in fctrl_portid.cc and mds_dt_tipc.c. So we 
only call the wrapper of sendto(), which handles the error code of 
sendto(). I think the only  EINTR code to be checked, there are a few 
places in opensaf that is handling error code of sendto() which we can 
take as reference.

-  return rc;
+  m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
+  return NCSCC_RC_FAILURE;
  }
  
  uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,

@@ -440,13 +442,16 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
  // try to send a few pending msg
  DataMessage* msg = nullptr;
  uint16_t send_msg_cnt = 0;
-while (send_msg_cnt++ < chunk_size_) {
+int retry = 0;
+while (send_msg_cnt < chunk_size_) {
// find the lowest sequence unsent yet
msg = sndqueue_.FirstUnsent();
if (msg == nullptr) {
  break;
} else {
if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {
+retry = 0;
+send_msg_cnt++;
  msg->is_sent_ = true;
  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
  "SndQData[fseq:%u, len:%u], "
@@ -454,6 +459,12 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
  id_.node, id_.ref,
  msg->header_.fseq_, msg->header_.msg_len_,
  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+  } else if (send_msg_cnt == 0) {
+// If not retry, all messages are kept in queue
+// and no more trigger to send messages
+retry++;
+assert(retry < 100);
+continue;


[Minh] We can accept to use the assert for now, and 100 should be 
defined as constant. But I do think we need a fallback mechanism, if the 
socket fd is not able to send data, we can terminate the portid, and 
trigger a MDS_DOWN event, ... and this could be looked in another ticket.


Also, the patch title does not seem to be right in the context of this 
ticket, where we have problem of "Cannot allocate memeory", we might not 
be able to send any more message (not that for all) and hit the assert. 
We can say "Add retry for tipc sendto()" or you have a better 
description for it.



} else {
  break;
}
@@ -508,9 +519,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
DataMessage* msg = sndqueue_.Find(Seq16(fseq));
if (msg != nullptr) {
  // Resend the msg found
-if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-  msg->is_sent_ = true;
+int retry = 0;
+while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  retry++;
+  assert(retry < 100);
+  continue;
  }
+msg->is_sent_ = true;
  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
  "RsndData[mseq:%u, mfrag:%u, fseq:%u], "

[devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-27 Thread thuan.tran
When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
 src/mds/mds_tipc_fctrl_portid.cc | 47 ++--
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 316e1ba75..d5314d5bc 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
 
 #include "mds/mds_tipc_fctrl_portid.h"
 #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
 
 #include "mds/mds_dt.h"
 #include "mds/mds_log.h"
@@ -149,23 +150,24 @@ void TipcPortId::FlushData() {
 
 uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
   struct sockaddr_tipc server_addr;
-  ssize_t send_len = 0;
-  uint32_t rc = NCSCC_RC_SUCCESS;
-
   memset(_addr, 0, sizeof(server_addr));
   server_addr.family = AF_TIPC;
   server_addr.addrtype = TIPC_ADDR_ID;
   server_addr.addr.id = id_;
-  send_len = sendto(bsrsock_, data, length, 0,
-(struct sockaddr *)_addr, sizeof(server_addr));
-
-  if (send_len == length) {
-rc = NCSCC_RC_SUCCESS;
-  } else {
-m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
-rc = NCSCC_RC_FAILURE;
+  int retry = 5;
+  while (retry >= 0) {
+ssize_t send_len = sendto(bsrsock_, data, length, 0,
+  (struct sockaddr *)_addr, sizeof(server_addr));
+
+if (send_len == length) {
+  return NCSCC_RC_SUCCESS;
+} else if (retry-- > 0) {
+  assert(errno == ENOMEM || errno == ENOBUFS);
+  osaf_nanosleep();
+}
   }
-  return rc;
+  m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
+  return NCSCC_RC_FAILURE;
 }
 
 uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
@@ -440,13 +442,16 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 // try to send a few pending msg
 DataMessage* msg = nullptr;
 uint16_t send_msg_cnt = 0;
-while (send_msg_cnt++ < chunk_size_) {
+int retry = 0;
+while (send_msg_cnt < chunk_size_) {
   // find the lowest sequence unsent yet
   msg = sndqueue_.FirstUnsent();
   if (msg == nullptr) {
 break;
   } else {
   if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) 
{
+retry = 0;
+send_msg_cnt++;
 msg->is_sent_ = true;
 m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
 "SndQData[fseq:%u, len:%u], "
@@ -454,6 +459,12 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 id_.node, id_.ref,
 msg->header_.fseq_, msg->header_.msg_len_,
 sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
+  } else if (send_msg_cnt == 0) {
+// If not retry, all messages are kept in queue
+// and no more trigger to send messages
+retry++;
+assert(retry < 100);
+continue;
   } else {
 break;
   }
@@ -508,9 +519,15 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
   DataMessage* msg = sndqueue_.Find(Seq16(fseq));
   if (msg != nullptr) {
 // Resend the msg found
-if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-  msg->is_sent_ = true;
+int retry = 0;
+while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  retry++;
+  assert(retry < 100);
+  continue;
 }
+msg->is_sent_ = true;
 m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
 "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
-- 
2.17.1



___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-27 Thread Tran Thuan
Hi Vu,

Thanks. See my reply inline.

Best Regards,
ThuanTr

-Original Message-
From: Nguyen Minh Vu  
Sent: Thursday, November 28, 2019 10:36 AM
To: thuan.tran ; 'Minh Hon Chau' 
; 'thang . d . nguyen' 
; gary@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue 
[#3123]

Hi Thuan,

Ack with comments inline.

Regards, Vu

On 11/27/19 6:33 PM, thuan.tran wrote:
> When overflow happens, mds with flow control enabled may keep
> all messages in queue if it fails to send a message when receiving
> Nack or ChunkAck since no more trigger come after that.
> MDS flow control should retry to send message in this scenario.
> ---
>   src/mds/mds_tipc_fctrl_portid.cc | 39 +++-
>   1 file changed, 23 insertions(+), 16 deletions(-)
>
> diff --git a/src/mds/mds_tipc_fctrl_portid.cc 
> b/src/mds/mds_tipc_fctrl_portid.cc
> index 316e1ba75..8081e8bd4 100644
> --- a/src/mds/mds_tipc_fctrl_portid.cc
> +++ b/src/mds/mds_tipc_fctrl_portid.cc
> @@ -17,6 +17,7 @@
>   
>   #include "mds/mds_tipc_fctrl_portid.h"
>   #include "base/ncssysf_def.h"
> +#include "base/osaf_time.h"
>   
>   #include "mds/mds_dt.h"
>   #include "mds/mds_log.h"
> @@ -149,23 +150,23 @@ void TipcPortId::FlushData() {
>   
>   uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
> struct sockaddr_tipc server_addr;
> -  ssize_t send_len = 0;
> -  uint32_t rc = NCSCC_RC_SUCCESS;
> -
> memset(_addr, 0, sizeof(server_addr));
> server_addr.family = AF_TIPC;
> server_addr.addrtype = TIPC_ADDR_ID;
> server_addr.addr.id = id_;
> -  send_len = sendto(bsrsock_, data, length, 0,
> -(struct sockaddr *)_addr, sizeof(server_addr));
> -
> -  if (send_len == length) {
> -rc = NCSCC_RC_SUCCESS;
> -  } else {
> -m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
> -rc = NCSCC_RC_FAILURE;
> +  int retry = 5;
> +  while (retry >= 0) {
> +ssize_t send_len = sendto(bsrsock_, data, length, 0,
> +  (struct sockaddr *)_addr, sizeof(server_addr));
> +
[Vu] Any case the sendto just sends a part of data? if so, the retry if 
any should not start from the beginning of data.
the below code shows what i meant:

ssize_t byte_sent = 0;
while (retry--) {
   ssize_t send_len = sendto(bsrsock_, data + byte_sent, length - byte_sent, 0,
  (struct sockaddr *)_addr, sizeof(server_addr));
if (send_lenn == -1) {
  // error handling here
  if (errno == EINTR) continue;
  // error, can't continue. should log something here?
  return NCSCC_RC_FAILURE; // or assert?
}

// number of bytes sent
byte_sent += send_data;
 if (byte_sent >= length) {
   return NCSCC_RC_SUCCESS;
 }
 
 // retry but do we need to sleep here?
 osaf_nanosleep();
}
  
[Thuan]: I think there is no case send a part of message.
Even if yes, the incomplete message is not accept by receiver.
Receiver don't have reassemble for unfragmented message.

> +if (send_len == length) {
> +  return NCSCC_RC_SUCCESS;
> +} else if (retry-- > 0) {
> +  osaf_nanosleep();
> +}
> }
> -  return rc;
> +  m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
> +  return NCSCC_RC_FAILURE;
>   }
>   
>   uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
> @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, 
> uint16_t chksize) {
>   // try to send a few pending msg
>   DataMessage* msg = nullptr;
>   uint16_t send_msg_cnt = 0;
> -while (send_msg_cnt++ < chunk_size_) {
> +while (send_msg_cnt < chunk_size_) {
> // find the lowest sequence unsent yet
> msg = sndqueue_.FirstUnsent();
> if (msg == nullptr) {
>   break;
> } else {
> if (Send(msg->msg_data_, msg->header_.msg_len_) == 
> NCSCC_RC_SUCCESS) {
> +send_msg_cnt++;
>   msg->is_sent_ = true;
>   m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>   "SndQData[fseq:%u, len:%u], "
> @@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
> chksize) {
>   msg->header_.fseq_, msg->header_.msg_len_,
>   sndwnd_.acked_.v(), sndwnd_.send_.v(), 
> sndwnd_.nacked_space_);
> } else {
> -break;
> +// If not retry, all messages are kept in queue
> +// and no more trigger to send messages
> +continue;
[Vu] If send is constantly failed, this loop has no way to exit?
[Thuan] Yes
> }
> }
>   }
> @@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t 
> mfrag,
> DataMessage* msg = sndqueue_.Find(Seq16(fseq));
> if (msg != nullptr) {
>   // Resend the msg found
> -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
> -  msg->is_sent_ = true;
> +while (Send(msg->msg_data_, 

Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-27 Thread Nguyen Minh Vu

Hi Thuan,

Ack with comments inline.

Regards, Vu

On 11/27/19 6:33 PM, thuan.tran wrote:

When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
  src/mds/mds_tipc_fctrl_portid.cc | 39 +++-
  1 file changed, 23 insertions(+), 16 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 316e1ba75..8081e8bd4 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
  
  #include "mds/mds_tipc_fctrl_portid.h"

  #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
  
  #include "mds/mds_dt.h"

  #include "mds/mds_log.h"
@@ -149,23 +150,23 @@ void TipcPortId::FlushData() {
  
  uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {

struct sockaddr_tipc server_addr;
-  ssize_t send_len = 0;
-  uint32_t rc = NCSCC_RC_SUCCESS;
-
memset(_addr, 0, sizeof(server_addr));
server_addr.family = AF_TIPC;
server_addr.addrtype = TIPC_ADDR_ID;
server_addr.addr.id = id_;
-  send_len = sendto(bsrsock_, data, length, 0,
-(struct sockaddr *)_addr, sizeof(server_addr));
-
-  if (send_len == length) {
-rc = NCSCC_RC_SUCCESS;
-  } else {
-m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
-rc = NCSCC_RC_FAILURE;
+  int retry = 5;
+  while (retry >= 0) {
+ssize_t send_len = sendto(bsrsock_, data, length, 0,
+  (struct sockaddr *)_addr, sizeof(server_addr));
+
[Vu] Any case the sendto just sends a part of data? if so, the retry if 
any should not start from the beginning of data.

the below code shows what i meant:

ssize_t byte_sent = 0;
while (retry--) {
  ssize_t send_len = sendto(bsrsock_, data + byte_sent, length - byte_sent, 0,
 (struct sockaddr *)_addr, sizeof(server_addr));
   if (send_lenn == -1) {
 // error handling here
 if (errno == EINTR) continue;
 // error, can't continue. should log something here?
 return NCSCC_RC_FAILURE; // or assert?
   }
   
   // number of bytes sent

   byte_sent += send_data;
if (byte_sent >= length) {
  return NCSCC_RC_SUCCESS;
}

// retry but do we need to sleep here?

osaf_nanosleep();
}
 


+if (send_len == length) {
+  return NCSCC_RC_SUCCESS;
+} else if (retry-- > 0) {
+  osaf_nanosleep();
+}
}
-  return rc;
+  m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
+  return NCSCC_RC_FAILURE;
  }
  
  uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,

@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
  // try to send a few pending msg
  DataMessage* msg = nullptr;
  uint16_t send_msg_cnt = 0;
-while (send_msg_cnt++ < chunk_size_) {
+while (send_msg_cnt < chunk_size_) {
// find the lowest sequence unsent yet
msg = sndqueue_.FirstUnsent();
if (msg == nullptr) {
  break;
} else {
if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {
+send_msg_cnt++;
  msg->is_sent_ = true;
  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
  "SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
  msg->header_.fseq_, msg->header_.msg_len_,
  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
-break;
+// If not retry, all messages are kept in queue
+// and no more trigger to send messages
+continue;

[Vu] If send is constantly failed, this loop has no way to exit?

}
}
  }
@@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
DataMessage* msg = sndqueue_.Find(Seq16(fseq));
if (msg != nullptr) {
  // Resend the msg found
-if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-  msg->is_sent_ = true;
+while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  continue;

[Vu] If send is constantly failed, this loop has no way to exit?

  }
+msg->is_sent_ = true;
  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
  "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
  "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",




___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


[devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-27 Thread thuan.tran
When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
 src/mds/mds_tipc_fctrl_portid.cc | 39 +++-
 1 file changed, 23 insertions(+), 16 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 316e1ba75..8081e8bd4 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
 
 #include "mds/mds_tipc_fctrl_portid.h"
 #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
 
 #include "mds/mds_dt.h"
 #include "mds/mds_log.h"
@@ -149,23 +150,23 @@ void TipcPortId::FlushData() {
 
 uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
   struct sockaddr_tipc server_addr;
-  ssize_t send_len = 0;
-  uint32_t rc = NCSCC_RC_SUCCESS;
-
   memset(_addr, 0, sizeof(server_addr));
   server_addr.family = AF_TIPC;
   server_addr.addrtype = TIPC_ADDR_ID;
   server_addr.addr.id = id_;
-  send_len = sendto(bsrsock_, data, length, 0,
-(struct sockaddr *)_addr, sizeof(server_addr));
-
-  if (send_len == length) {
-rc = NCSCC_RC_SUCCESS;
-  } else {
-m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
-rc = NCSCC_RC_FAILURE;
+  int retry = 5;
+  while (retry >= 0) {
+ssize_t send_len = sendto(bsrsock_, data, length, 0,
+  (struct sockaddr *)_addr, sizeof(server_addr));
+
+if (send_len == length) {
+  return NCSCC_RC_SUCCESS;
+} else if (retry-- > 0) {
+  osaf_nanosleep();
+}
   }
-  return rc;
+  m_MDS_LOG_ERR("FCTRL: sendto() failed, Error[%s]", strerror(errno));
+  return NCSCC_RC_FAILURE;
 }
 
 uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length,
@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 // try to send a few pending msg
 DataMessage* msg = nullptr;
 uint16_t send_msg_cnt = 0;
-while (send_msg_cnt++ < chunk_size_) {
+while (send_msg_cnt < chunk_size_) {
   // find the lowest sequence unsent yet
   msg = sndqueue_.FirstUnsent();
   if (msg == nullptr) {
 break;
   } else {
   if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) 
{
+send_msg_cnt++;
 msg->is_sent_ = true;
 m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
 "SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,9 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 msg->header_.fseq_, msg->header_.msg_len_,
 sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   } else {
-break;
+// If not retry, all messages are kept in queue
+// and no more trigger to send messages
+continue;
   }
   }
 }
@@ -508,9 +512,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
   DataMessage* msg = sndqueue_.Find(Seq16(fseq));
   if (msg != nullptr) {
 // Resend the msg found
-if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-  msg->is_sent_ = true;
+while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  continue;
 }
+msg->is_sent_ = true;
 m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
 "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
-- 
2.17.1



___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-26 Thread Minh Hon Chau
But after all retries are still failed, we might need to terminate the 
portid, which leads to a MDS DOWN event, but let's look at it later.


On 27/11/19 3:23 pm, Minh Hon Chau wrote:

Hi Thuan,

I'm thinking to retry 3 times with 100 ms in between, but you can 
decide it. Also, we need to ensure not to make the mds main receiving 
thread being blocked with the retry (on the flow of processing data). 
The retry in this patch is ok since it retries on the mds flow control 
thread, so it does not delay the mds main receiving thread.


Thanks

Minh

On 27/11/19 2:40 pm, Tran Thuan wrote:

Hi Minh,

I think it's good if retry some times for normal Send().
Do you have any idea how many retries? Interval b/w tries?

Best Regards,
ThuanTr

-Original Message-
From: Minh Hon Chau 
Sent: Wednesday, November 27, 2019 10:30 AM
To: thuan.tran ; thang . d . nguyen 
; 'Nguyen Minh Vu' 
; gary@dektech.com.au

Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages 
in queue [#3123]


Hi Thuan,

The TipcPortId:Send is also called at a few other places, do you think
it is good if we make a wrapper of TipcPortId::Send with a few retries
on failures, says TipcPortId::TryToSend(), and call TryToSend() instead
of Send()?

Thanks

Minh

On 27/11/19 1:26 pm, thuan.tran wrote:

When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
   src/mds/mds_tipc_fctrl_portid.cc | 16 
   1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc 
b/src/mds/mds_tipc_fctrl_portid.cc

index 724eb7b7b..e6e179669 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
      #include "mds/mds_tipc_fctrl_portid.h"
   #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
      #include "mds/mds_dt.h"
   #include "mds/mds_log.h"
@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t 
fseq, uint16_t chksize) {

   // try to send a few pending msg
   DataMessage* msg = nullptr;
   uint16_t send_msg_cnt = 0;
-    while (send_msg_cnt++ < chunk_size_) {
+    while (send_msg_cnt < chunk_size_) {
 // find the lowest sequence unsent yet
 msg = sndqueue_.FirstUnsent();
 if (msg == nullptr) {
   break;
 } else {
 if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {

+    send_msg_cnt++;
   msg->is_sent_ = true;
   m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
   "SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, 
uint16_t chksize) {

   msg->header_.fseq_, msg->header_.msg_len_,
   sndwnd_.acked_.v(), sndwnd_.send_.v(), 
sndwnd_.nacked_space_);

 } else {
-    break;
+    // If not retry, all messages are kept in queue
+    // and no more trigger to send messages
+    osaf_nanosleep();
+    continue;
 }
 }
   }
@@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, 
uint16_t mfrag,

 DataMessage* msg = sndqueue_.Find(Seq16(fseq));
 if (msg != nullptr) {
   // Resend the msg found
-    if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {

-  msg->is_sent_ = true;
+    while (Send(msg->msg_data_, msg->header_.msg_len_) != 
NCSCC_RC_SUCCESS) {

+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  osaf_nanosleep();
   }
+    msg->is_sent_ = true;
   m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
   "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",





___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-26 Thread Minh Hon Chau

Hi Thuan,

I'm thinking to retry 3 times with 100 ms in between, but you can decide 
it. Also, we need to ensure not to make the mds main receiving thread 
being blocked with the retry (on the flow of processing data). The retry 
in this patch is ok since it retries on the mds flow control thread, so 
it does not delay the mds main receiving thread.


Thanks

Minh

On 27/11/19 2:40 pm, Tran Thuan wrote:

Hi Minh,

I think it's good if retry some times for normal Send().
Do you have any idea how many retries? Interval b/w tries?

Best Regards,
ThuanTr

-Original Message-
From: Minh Hon Chau 
Sent: Wednesday, November 27, 2019 10:30 AM
To: thuan.tran ; thang . d . nguyen 
; 'Nguyen Minh Vu' ; 
gary@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue 
[#3123]

Hi Thuan,

The TipcPortId:Send is also called at a few other places, do you think
it is good if we make a wrapper of TipcPortId::Send with a few retries
on failures, says TipcPortId::TryToSend(), and call TryToSend() instead
of Send()?

Thanks

Minh

On 27/11/19 1:26 pm, thuan.tran wrote:

When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
   src/mds/mds_tipc_fctrl_portid.cc | 16 
   1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 724eb7b7b..e6e179669 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
   
   #include "mds/mds_tipc_fctrl_portid.h"

   #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
   
   #include "mds/mds_dt.h"

   #include "mds/mds_log.h"
@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
   // try to send a few pending msg
   DataMessage* msg = nullptr;
   uint16_t send_msg_cnt = 0;
-while (send_msg_cnt++ < chunk_size_) {
+while (send_msg_cnt < chunk_size_) {
 // find the lowest sequence unsent yet
 msg = sndqueue_.FirstUnsent();
 if (msg == nullptr) {
   break;
 } else {
 if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {
+send_msg_cnt++;
   msg->is_sent_ = true;
   m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
   "SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
   msg->header_.fseq_, msg->header_.msg_len_,
   sndwnd_.acked_.v(), sndwnd_.send_.v(), 
sndwnd_.nacked_space_);
 } else {
-break;
+// If not retry, all messages are kept in queue
+// and no more trigger to send messages
+osaf_nanosleep();
+continue;
 }
 }
   }
@@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
 DataMessage* msg = sndqueue_.Find(Seq16(fseq));
 if (msg != nullptr) {
   // Resend the msg found
-if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-  msg->is_sent_ = true;
+while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  osaf_nanosleep();
   }
+msg->is_sent_ = true;
   m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
   "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",





___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-26 Thread Tran Thuan
Hi Minh,

I think it's good if retry some times for normal Send().
Do you have any idea how many retries? Interval b/w tries?

Best Regards,
ThuanTr

-Original Message-
From: Minh Hon Chau  
Sent: Wednesday, November 27, 2019 10:30 AM
To: thuan.tran ; thang . d . nguyen 
; 'Nguyen Minh Vu' ; 
gary@dektech.com.au
Cc: opensaf-devel@lists.sourceforge.net
Subject: Re: [PATCH 1/1] mds: Fix mds flow control keep all messages in queue 
[#3123]

Hi Thuan,

The TipcPortId:Send is also called at a few other places, do you think 
it is good if we make a wrapper of TipcPortId::Send with a few retries 
on failures, says TipcPortId::TryToSend(), and call TryToSend() instead 
of Send()?

Thanks

Minh

On 27/11/19 1:26 pm, thuan.tran wrote:
> When overflow happens, mds with flow control enabled may keep
> all messages in queue if it fails to send a message when receiving
> Nack or ChunkAck since no more trigger come after that.
> MDS flow control should retry to send message in this scenario.
> ---
>   src/mds/mds_tipc_fctrl_portid.cc | 16 
>   1 file changed, 12 insertions(+), 4 deletions(-)
>
> diff --git a/src/mds/mds_tipc_fctrl_portid.cc 
> b/src/mds/mds_tipc_fctrl_portid.cc
> index 724eb7b7b..e6e179669 100644
> --- a/src/mds/mds_tipc_fctrl_portid.cc
> +++ b/src/mds/mds_tipc_fctrl_portid.cc
> @@ -17,6 +17,7 @@
>   
>   #include "mds/mds_tipc_fctrl_portid.h"
>   #include "base/ncssysf_def.h"
> +#include "base/osaf_time.h"
>   
>   #include "mds/mds_dt.h"
>   #include "mds/mds_log.h"
> @@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, 
> uint16_t chksize) {
>   // try to send a few pending msg
>   DataMessage* msg = nullptr;
>   uint16_t send_msg_cnt = 0;
> -while (send_msg_cnt++ < chunk_size_) {
> +while (send_msg_cnt < chunk_size_) {
> // find the lowest sequence unsent yet
> msg = sndqueue_.FirstUnsent();
> if (msg == nullptr) {
>   break;
> } else {
> if (Send(msg->msg_data_, msg->header_.msg_len_) == 
> NCSCC_RC_SUCCESS) {
> +send_msg_cnt++;
>   msg->is_sent_ = true;
>   m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>   "SndQData[fseq:%u, len:%u], "
> @@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
> chksize) {
>   msg->header_.fseq_, msg->header_.msg_len_,
>   sndwnd_.acked_.v(), sndwnd_.send_.v(), 
> sndwnd_.nacked_space_);
> } else {
> -break;
> +// If not retry, all messages are kept in queue
> +// and no more trigger to send messages
> +osaf_nanosleep();
> +continue;
> }
> }
>   }
> @@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t 
> mfrag,
> DataMessage* msg = sndqueue_.Find(Seq16(fseq));
> if (msg != nullptr) {
>   // Resend the msg found
> -if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
> -  msg->is_sent_ = true;
> +while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
> +  // If not retry, all messages are kept in queue
> +  // and no more trigger to send messages
> +  osaf_nanosleep();
>   }
> +msg->is_sent_ = true;
>   m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
>   "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
>   "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",



___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


Re: [devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-26 Thread Minh Hon Chau

Hi Thuan,

The TipcPortId:Send is also called at a few other places, do you think 
it is good if we make a wrapper of TipcPortId::Send with a few retries 
on failures, says TipcPortId::TryToSend(), and call TryToSend() instead 
of Send()?


Thanks

Minh

On 27/11/19 1:26 pm, thuan.tran wrote:

When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
  src/mds/mds_tipc_fctrl_portid.cc | 16 
  1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 724eb7b7b..e6e179669 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
  
  #include "mds/mds_tipc_fctrl_portid.h"

  #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
  
  #include "mds/mds_dt.h"

  #include "mds/mds_log.h"
@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
  // try to send a few pending msg
  DataMessage* msg = nullptr;
  uint16_t send_msg_cnt = 0;
-while (send_msg_cnt++ < chunk_size_) {
+while (send_msg_cnt < chunk_size_) {
// find the lowest sequence unsent yet
msg = sndqueue_.FirstUnsent();
if (msg == nullptr) {
  break;
} else {
if (Send(msg->msg_data_, msg->header_.msg_len_) == 
NCSCC_RC_SUCCESS) {
+send_msg_cnt++;
  msg->is_sent_ = true;
  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
  "SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
  msg->header_.fseq_, msg->header_.msg_len_,
  sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
} else {
-break;
+// If not retry, all messages are kept in queue
+// and no more trigger to send messages
+osaf_nanosleep();
+continue;
}
}
  }
@@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
DataMessage* msg = sndqueue_.Find(Seq16(fseq));
if (msg != nullptr) {
  // Resend the msg found
-if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-  msg->is_sent_ = true;
+while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  osaf_nanosleep();
  }
+msg->is_sent_ = true;
  m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
  "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
  "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",



___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel


[devel] [PATCH 1/1] mds: Fix mds flow control keep all messages in queue [#3123]

2019-11-26 Thread thuan.tran
When overflow happens, mds with flow control enabled may keep
all messages in queue if it fails to send a message when receiving
Nack or ChunkAck since no more trigger come after that.
MDS flow control should retry to send message in this scenario.
---
 src/mds/mds_tipc_fctrl_portid.cc | 16 
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 724eb7b7b..e6e179669 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -17,6 +17,7 @@
 
 #include "mds/mds_tipc_fctrl_portid.h"
 #include "base/ncssysf_def.h"
+#include "base/osaf_time.h"
 
 #include "mds/mds_dt.h"
 #include "mds/mds_log.h"
@@ -440,13 +441,14 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 // try to send a few pending msg
 DataMessage* msg = nullptr;
 uint16_t send_msg_cnt = 0;
-while (send_msg_cnt++ < chunk_size_) {
+while (send_msg_cnt < chunk_size_) {
   // find the lowest sequence unsent yet
   msg = sndqueue_.FirstUnsent();
   if (msg == nullptr) {
 break;
   } else {
   if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) 
{
+send_msg_cnt++;
 msg->is_sent_ = true;
 m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
 "SndQData[fseq:%u, len:%u], "
@@ -455,7 +457,10 @@ void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t 
chksize) {
 msg->header_.fseq_, msg->header_.msg_len_,
 sndwnd_.acked_.v(), sndwnd_.send_.v(), sndwnd_.nacked_space_);
   } else {
-break;
+// If not retry, all messages are kept in queue
+// and no more trigger to send messages
+osaf_nanosleep();
+continue;
   }
   }
 }
@@ -508,9 +513,12 @@ void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
   DataMessage* msg = sndqueue_.Find(Seq16(fseq));
   if (msg != nullptr) {
 // Resend the msg found
-if (Send(msg->msg_data_, msg->header_.msg_len_) == NCSCC_RC_SUCCESS) {
-  msg->is_sent_ = true;
+while (Send(msg->msg_data_, msg->header_.msg_len_) != NCSCC_RC_SUCCESS) {
+  // If not retry, all messages are kept in queue
+  // and no more trigger to send messages
+  osaf_nanosleep();
 }
+msg->is_sent_ = true;
 m_MDS_LOG_NOTIFY("FCTRL: [me] --> [node:%x, ref:%u], "
 "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
 "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
-- 
2.17.1



___
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel