Re: [devel] [PATCH 4/9] mds: Add timeout for ack message [#1960]

2019-09-15 Thread Nguyen Minh Vu

Hi Minh,

I have minor comments below.

Regards, Vu

On 8/14/19 1:38 PM, Minh Chau wrote:

If the ack size is configured greater than 1, there should be a timeout
at receiver ends to send the ack message back to senders.
The ack message timeout utilizes the poll timeout in flow control thread
to make mds lightweight (in contrast to additional timer threads).
---
  src/mds/mds_tipc_fctrl_intf.cc   | 33 ++---
  src/mds/mds_tipc_fctrl_msg.h |  6 ++
  src/mds/mds_tipc_fctrl_portid.cc | 15 +++
  src/mds/mds_tipc_fctrl_portid.h  |  1 +
  4 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
index 91b9107..bd0a8f6 100644
--- a/src/mds/mds_tipc_fctrl_intf.cc
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -66,7 +66,8 @@ std::map portid_map;
  std::mutex portid_map_mutex;
  
  // chunk ack parameters

-// todo: The chunk ack size should be configurable
+// todo: The chunk ack timeout and chunk ack size should be configurable
+int kChunkAckTimeout = 1000;  // in miliseconds
  uint16_t kChunkAckSize = 3;
  
  TipcPortId* portid_lookup(struct tipc_portid id) {

@@ -75,6 +76,15 @@ TipcPortId* portid_lookup(struct tipc_portid id) {
return portid_map[uid];
  }
  
+void process_timer_event(const Event evt) {

+  for (auto i : portid_map) {
+TipcPortId* portid = i.second;
+if (evt.type_ == Event::Type::kEvtTmrChunkAck) {
+  portid->ReceiveTmrChunkAck();
+}
+  }
+}
+
  uint32_t process_flow_event(const Event evt) {
uint32_t rc = NCSCC_RC_SUCCESS;
TipcPortId *portid = portid_lookup(evt.id_);
@@ -110,7 +120,7 @@ uint32_t process_flow_event(const Event evt) {
  uint32_t process_all_events(void) {
enum { FD_FCTRL = 0, NUM_FDS };
  
-  int poll_tmo = MDTM_TIPC_POLL_TIMEOUT;

+  int poll_tmo = kChunkAckTimeout;
while (true) {
  int pollres;
  struct pollfd pfd[NUM_FDS] = {{0}};
@@ -135,11 +145,24 @@ uint32_t process_all_events(void) {
  if (evt == nullptr) continue;
  
  portid_map_mutex.lock();

-process_flow_event(*evt);
+
+if (evt->IsTimerEvent()) {
+  process_timer_event(*evt);
+}
+if (evt->IsFlowEvent()) {
+  process_flow_event(*evt);
+}
+

[Vu] Should log something here if the event is none of above?

  delete evt;
  portid_map_mutex.unlock();
}
  }
+// timeout, scan all portid and send ack msgs
+if (pollres == 0) {
+  portid_map_mutex.lock();
+  process_timer_event(Event(Event::Type::kEvtTmrChunkAck));
+  portid_map_mutex.unlock();
+}
}  /* while */
return NCSCC_RC_SUCCESS;
  }
@@ -368,6 +391,10 @@ uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t 
len,
portid_map_mutex.lock();
uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_));
+  if (rc == NCSCC_RC_CONTINUE) {
+process_timer_event(Event(Event::Type::kEvtTmrChunkAck));

[Vu] Missed to unlock the mutex here

+rc = NCSCC_RC_SUCCESS;
+  }
portid_map_mutex.unlock();
return rc;
  }
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
index 677f256..8e6a874 100644
--- a/src/mds/mds_tipc_fctrl_msg.h
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -44,6 +44,8 @@ class Event {
 // selective data msgs (not supported)
  kEvtDropData,  // event reported from tipc that a message is not
 // delivered
+kEvtTmrAll,
+kEvtTmrChunkAck,  // event to send the chunk ack
};
NCS_IPC_MSG next_{0};
Type type_;
@@ -68,6 +70,10 @@ class Event {
  fseq_(f_seg_num), chunk_size_(chunk_size) {
  type_ = type;
}
+  bool IsTimerEvent() { return (type_ > Type::kEvtTmrAll); }
+  bool IsFlowEvent() {
+return (Type::kEvtDataFlowAll < type_ && type_ < Type::kEvtTmrAll);
+  }
[Vu] Consider making these ones  to be constant methods if they do not 
change any of their attribute values.

  };
  
  class BaseMessage {

diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
index 24d13ee..64115d5 100644
--- a/src/mds/mds_tipc_fctrl_portid.cc
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -67,6 +67,8 @@ TipcPortId::TipcPortId(struct tipc_portid id, int sock, 
uint16_t chksize,
  }
  
  TipcPortId::~TipcPortId() {

+  // Fake a TmrChunkAck event to ack all received messages
+  ReceiveTmrChunkAck();
// clear all msg in sndqueue_
sndqueue_.Clear();
  }
@@ -156,6 +158,7 @@ uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t 
mfrag,
// send ack for @chunk_size_ msgs starting from fseq
SendChunkAck(fseq, svc_id, chunk_size_);
rcvwnd_.acked_ = rcvwnd_.rcv_;
+  rc = NCSCC_RC_CONTINUE;
  }
} else {
  // todo: update rcvwnd_.nacked_space_.
@@ -258,4 +261,16 @@ void TipcPortId::ReceiveNack(uint32_t 

Re: [devel] [PATCH 3/9] mds: Add implementation for TIPC buffer overflow solution [#1960]

2019-09-15 Thread Nguyen Minh Vu

Hi Minh,

I have several comments below, started with [Vu].

Regards, Vu

On 8/14/19 1:01 PM, Minh Chau wrote:

This is a collaborative patch of two participants:
- Tran Thuan 
- Minh Chau 

Main changes:
- Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files
introduce new functions which are called in mds_dt_tipc.c if the flow
control is enabled
- Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files
implements the tipc portid instance, which supports the sliding window,
mds msg queue
- Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define
the event and messages which are used for this solution.
---
  src/mds/Makefile.am  |  10 +-
  src/mds/mds_dt.h |   8 +-
  src/mds/mds_dt_tipc.c| 188 +---
  src/mds/mds_tipc_fctrl_intf.cc   | 376 +++
  src/mds/mds_tipc_fctrl_intf.h|  47 +
  src/mds/mds_tipc_fctrl_msg.cc| 142 +++
  src/mds/mds_tipc_fctrl_msg.h | 129 ++
  src/mds/mds_tipc_fctrl_portid.cc | 261 +++
  src/mds/mds_tipc_fctrl_portid.h  |  87 +
  9 files changed, 1184 insertions(+), 64 deletions(-)
  create mode 100644 src/mds/mds_tipc_fctrl_intf.cc
  create mode 100644 src/mds/mds_tipc_fctrl_intf.h
  create mode 100644 src/mds/mds_tipc_fctrl_msg.cc
  create mode 100644 src/mds/mds_tipc_fctrl_msg.h
  create mode 100644 src/mds/mds_tipc_fctrl_portid.cc
  create mode 100644 src/mds/mds_tipc_fctrl_portid.h

diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am
index 2d7b652..d849e8f 100644
--- a/src/mds/Makefile.am
+++ b/src/mds/Makefile.am
@@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \
  if ENABLE_TIPC_TRANSPORT
  noinst_HEADERS += src/mds/mds_dt_tipc.h \
src/mds/mds_tipc_recvq_stats.h \
-   src/mds/mds_tipc_recvq_stats_impl.h
+   src/mds/mds_tipc_recvq_stats_impl.h \
+   src/mds/mds_tipc_fctrl_intf.h \
+   src/mds/mds_tipc_fctrl_portid.h \
+   src/mds/mds_tipc_fctrl_msg.h
  lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \
src/mds/mds_tipc_recvq_stats.cc \
-   src/mds/mds_tipc_recvq_stats_impl.cc
+   src/mds/mds_tipc_recvq_stats_impl.cc \
+   src/mds/mds_tipc_fctrl_intf.cc \
+   src/mds/mds_tipc_fctrl_portid.cc \
+   src/mds/mds_tipc_fctrl_msg.cc
  endif
  
  if ENABLE_TESTS

diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h
index b645bb4..d9e8633 100644
--- a/src/mds/mds_dt.h
+++ b/src/mds/mds_dt.h
@@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL ref);
  uint32_t mds_tmr_mailbox_processing(void);
  uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL *svc_hdl);
  uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
-   uint16_t frag_byte);
+   uint16_t frag_byte, uint16_t fctrl_seq_num);
  uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg);
  uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, uint64_t tipc_id,
  uint32_t *buff_dump);
@@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, NCSCONTEXT 
msg);
  
  #define MDS_PROT 0xA0

  #define MDS_VERSION 0x08
-#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION)
+#define MDS_PROT_VER_MASK 0xFC
  #define MDTM_PRI_MASK 0x3
  
+/* MDS protocol/version for flow control */

+#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION)
+#define MDS_PROT_FCTRL_ID 0x00AC13F5
+
  /* Added for the subscription changes */
  #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff)
  #define MDS_TIPC_COMMON_ID 0x01001000
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 86b52bb..fef1c50 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -47,6 +47,7 @@
  #include "mds_dt_tipc.h"
  #include "mds_dt_tcp_disc.h"
  #include "mds_core.h"
+#include "mds_tipc_fctrl_intf.h"
  #include "mds_tipc_recvq_stats.h"
  #include "base/osaf_utility.h"
  #include "base/osaf_poll.h"
@@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list;
  uint32_t mdtm_global_frag_num;
  
  const unsigned int MAX_RECV_THRESHOLD = 30;

+uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION;
  
-static bool get_tipc_port_id(int sock, uint32_t* port_id) {

+static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) {
struct sockaddr_tipc addr;
socklen_t sz = sizeof(addr);
  
  	memset(, 0, sizeof(addr));

-   *port_id = 0;
+   port_id->node = 0;
+   port_id->ref = 0;
if (0 > getsockname(sock, (struct sockaddr *), )) {
syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, err: %s",
   strerror(errno));
return false;
}
  
-	*port_id = addr.addr.id.ref;

+   *port_id = addr.addr.id;
return true;
  }
  
@@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)

}
  
  	/* Code for getting the self tipc random number 

Re: [devel] [PATCH 3/9] mds: Add implementation for TIPC buffer overflow solution [#1960]

2019-09-15 Thread Minh Hon Chau

Hi Hans, Gary, Vu

Do you have any comments on remaining patches?

Thanks

Minh

On 11/9/19 11:01 am, Minh Hon Chau wrote:

Hi Gary,

Thanks for the review, please find comments with [M].

/Minh

On 10/9/19 6:02 pm, Gary Lee wrote:

Hi Minh & Thuan

Some minor comments marked with [GL].

On 14/8/19 4:38 pm, Minh Chau wrote:

This is a collaborative patch of two participants:Thuan, Minh.

Main changes:
- Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files
introduce new functions which are called in mds_dt_tipc.c if the flow
control is enabled
- Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files
implements the tipc portid instance, which supports the sliding window,
mds msg queue
- Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define
the event and messages which are used for this solution.
---
  src/mds/Makefile.am  |  10 +-
  src/mds/mds_dt.h |   8 +-
  src/mds/mds_dt_tipc.c    | 188 +---
  src/mds/mds_tipc_fctrl_intf.cc   | 376 
+++

  src/mds/mds_tipc_fctrl_intf.h    |  47 +
  src/mds/mds_tipc_fctrl_msg.cc    | 142 +++
  src/mds/mds_tipc_fctrl_msg.h | 129 ++
  src/mds/mds_tipc_fctrl_portid.cc | 261 +++
  src/mds/mds_tipc_fctrl_portid.h  |  87 +
  9 files changed, 1184 insertions(+), 64 deletions(-)
  create mode 100644 src/mds/mds_tipc_fctrl_intf.cc
  create mode 100644 src/mds/mds_tipc_fctrl_intf.h
  create mode 100644 src/mds/mds_tipc_fctrl_msg.cc
  create mode 100644 src/mds/mds_tipc_fctrl_msg.h
  create mode 100644 src/mds/mds_tipc_fctrl_portid.cc
  create mode 100644 src/mds/mds_tipc_fctrl_portid.h

diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am
index 2d7b652..d849e8f 100644
--- a/src/mds/Makefile.am
+++ b/src/mds/Makefile.am
@@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \
  if ENABLE_TIPC_TRANSPORT
  noinst_HEADERS += src/mds/mds_dt_tipc.h \
  src/mds/mds_tipc_recvq_stats.h \
-    src/mds/mds_tipc_recvq_stats_impl.h
+    src/mds/mds_tipc_recvq_stats_impl.h \
+    src/mds/mds_tipc_fctrl_intf.h \
+    src/mds/mds_tipc_fctrl_portid.h \
+    src/mds/mds_tipc_fctrl_msg.h
  lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \
  src/mds/mds_tipc_recvq_stats.cc \
-    src/mds/mds_tipc_recvq_stats_impl.cc
+    src/mds/mds_tipc_recvq_stats_impl.cc \
+    src/mds/mds_tipc_fctrl_intf.cc \
+    src/mds/mds_tipc_fctrl_portid.cc \
+    src/mds/mds_tipc_fctrl_msg.cc
  endif
    if ENABLE_TESTS
diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h
index b645bb4..d9e8633 100644
--- a/src/mds/mds_dt.h
+++ b/src/mds/mds_dt.h
@@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL 
ref);

  uint32_t mds_tmr_mailbox_processing(void);
  uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL 
*svc_hdl);
  uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, 
uint32_t seq_num,

-   uint16_t frag_byte);
+   uint16_t frag_byte, uint16_t 
fctrl_seq_num);

  uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg);
  uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, 
uint64_t tipc_id,

  uint32_t *buff_dump);
@@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, 
NCSCONTEXT msg);

    #define MDS_PROT 0xA0
  #define MDS_VERSION 0x08
-#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION)
+#define MDS_PROT_VER_MASK 0xFC
  #define MDTM_PRI_MASK 0x3
  +/* MDS protocol/version for flow control */
+#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION)
+#define MDS_PROT_FCTRL_ID 0x00AC13F5
+
  /* Added for the subscription changes */
  #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff)
  #define MDS_TIPC_COMMON_ID 0x01001000
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 86b52bb..fef1c50 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -47,6 +47,7 @@
  #include "mds_dt_tipc.h"
  #include "mds_dt_tcp_disc.h"
  #include "mds_core.h"
+#include "mds_tipc_fctrl_intf.h"
  #include "mds_tipc_recvq_stats.h"
  #include "base/osaf_utility.h"
  #include "base/osaf_poll.h"
@@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list;
  uint32_t mdtm_global_frag_num;
    const unsigned int MAX_RECV_THRESHOLD = 30;
+uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION;
  -static bool get_tipc_port_id(int sock, uint32_t* port_id) {
+static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) {
  struct sockaddr_tipc addr;
  socklen_t sz = sizeof(addr);
    memset(, 0, sizeof(addr));
-    *port_id = 0;
+    port_id->node = 0;
+    port_id->ref = 0;
  if (0 > getsockname(sock, (struct sockaddr *), )) {
  syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, err: 
%s",

 strerror(errno));
  return false;
  }
  -    *port_id = addr.addr.id.ref;
+    *port_id = addr.addr.id;
  return true;
  }
  @@ -240,12 +243,13