Hi Vu,

I see it, will add.

Thanks

Minh

On 16/9/19 4:21 pm, Nguyen Minh Vu wrote:
Hi Minh,

See my responses to your comments below, started with [Vu2].

Regards, Vu

On 9/16/19 1:06 PM, Minh Hon Chau wrote:
Hi Vu,

Several comments with [M] too :).

Thanks

Minh

On 16/9/19 2:24 pm, Nguyen Minh Vu wrote:
Hi Minh,

I have several comments below, started with [Vu].

Regards, Vu

On 8/14/19 1:01 PM, Minh Chau wrote:
This is a collaborative patch of two participants:
- Tran Thuan <thuan.t...@dektech.com.au>
- Minh Chau <minh.c...@dektech.com.au>

Main changes:
- Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files
introduce new functions which are called in mds_dt_tipc.c if the flow
control is enabled
- Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files
implements the tipc portid instance, which supports the sliding window,
mds msg queue
- Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define
the event and messages which are used for this solution.
---
  src/mds/Makefile.am              |  10 +-
  src/mds/mds_dt.h                 |   8 +-
  src/mds/mds_dt_tipc.c            | 188 +++++++++++++-------
  src/mds/mds_tipc_fctrl_intf.cc   | 376 +++++++++++++++++++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_intf.h    |  47 +++++
  src/mds/mds_tipc_fctrl_msg.cc    | 142 +++++++++++++++
  src/mds/mds_tipc_fctrl_msg.h     | 129 ++++++++++++++
  src/mds/mds_tipc_fctrl_portid.cc | 261 +++++++++++++++++++++++++++
  src/mds/mds_tipc_fctrl_portid.h  |  87 +++++++++
  9 files changed, 1184 insertions(+), 64 deletions(-)
  create mode 100644 src/mds/mds_tipc_fctrl_intf.cc
  create mode 100644 src/mds/mds_tipc_fctrl_intf.h
  create mode 100644 src/mds/mds_tipc_fctrl_msg.cc
  create mode 100644 src/mds/mds_tipc_fctrl_msg.h
  create mode 100644 src/mds/mds_tipc_fctrl_portid.cc
  create mode 100644 src/mds/mds_tipc_fctrl_portid.h

diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am
index 2d7b652..d849e8f 100644
--- a/src/mds/Makefile.am
+++ b/src/mds/Makefile.am
@@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \
  if ENABLE_TIPC_TRANSPORT
  noinst_HEADERS += src/mds/mds_dt_tipc.h \
      src/mds/mds_tipc_recvq_stats.h \
-    src/mds/mds_tipc_recvq_stats_impl.h
+    src/mds/mds_tipc_recvq_stats_impl.h \
+    src/mds/mds_tipc_fctrl_intf.h \
+    src/mds/mds_tipc_fctrl_portid.h \
+    src/mds/mds_tipc_fctrl_msg.h
  lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \
      src/mds/mds_tipc_recvq_stats.cc \
-    src/mds/mds_tipc_recvq_stats_impl.cc
+    src/mds/mds_tipc_recvq_stats_impl.cc \
+    src/mds/mds_tipc_fctrl_intf.cc \
+    src/mds/mds_tipc_fctrl_portid.cc \
+    src/mds/mds_tipc_fctrl_msg.cc
  endif
    if ENABLE_TESTS
diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h
index b645bb4..d9e8633 100644
--- a/src/mds/mds_dt.h
+++ b/src/mds/mds_dt.h
@@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL ref);
  uint32_t mds_tmr_mailbox_processing(void);
  uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL *svc_hdl);   uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
-                           uint16_t frag_byte);
+                           uint16_t frag_byte, uint16_t fctrl_seq_num);
  uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg);
  uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, uint64_t tipc_id,
                                  uint32_t *buff_dump);
@@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, NCSCONTEXT msg);
    #define MDS_PROT 0xA0
  #define MDS_VERSION 0x08
-#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION)
+#define MDS_PROT_VER_MASK 0xFC
  #define MDTM_PRI_MASK 0x3
  +/* MDS protocol/version for flow control */
+#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION)
+#define MDS_PROT_FCTRL_ID 0x00AC13F5
+
  /* Added for the subscription changes */
  #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff0000)
  #define MDS_TIPC_COMMON_ID 0x01001000
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 86b52bb..fef1c50 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -47,6 +47,7 @@
  #include "mds_dt_tipc.h"
  #include "mds_dt_tcp_disc.h"
  #include "mds_core.h"
+#include "mds_tipc_fctrl_intf.h"
  #include "mds_tipc_recvq_stats.h"
  #include "base/osaf_utility.h"
  #include "base/osaf_poll.h"
@@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list;
  uint32_t mdtm_global_frag_num;
    const unsigned int MAX_RECV_THRESHOLD = 30;
+uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION;
  -static bool get_tipc_port_id(int sock, uint32_t* port_id) {
+static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) {
      struct sockaddr_tipc addr;
      socklen_t sz = sizeof(addr);
        memset(&addr, 0, sizeof(addr));
-    *port_id = 0;
+    port_id->node = 0;
+    port_id->ref = 0;
      if (0 > getsockname(sock, (struct sockaddr *)&addr, &sz)) {
          syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, err: %s",
                 strerror(errno));
          return false;
      }
  -    *port_id = addr.addr.id.ref;
+    *port_id = addr.addr.id;
      return true;
  }
  @@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
      }
        /* Code for getting the self tipc random number */
-    if (!get_tipc_port_id(tipc_cb.BSRsock, mds_tipc_ref)) {
+    struct tipc_portid port_id;
+    if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) {
          close(tipc_cb.Dsock);
          close(tipc_cb.BSRsock);
          return NCSCC_RC_FAILURE;
      }
-
+    *mds_tipc_ref = port_id.ref;
      tipc_cb.adest = ((uint64_t)(nodeid)) << 32;
      tipc_cb.adest |= *mds_tipc_ref;
      tipc_cb.node_id = nodeid;
@@ -325,6 +329,23 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref)
          mdtm_set_transport(MDTM_TX_TYPE_TIPC);
      }
  +    /* Get tipc socket receive buffer size */
+    int optval;
+    socklen_t optlen = sizeof(optval);
+    if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF,
+        &optval, &optlen) != 0) {
+        syslog(LOG_ERR, "MDTM: getsockopt() failed to get rcv buf size to: %str",
+            strerror(errno));
+        close(tipc_cb.Dsock);
+        close(tipc_cb.BSRsock);
+        return NCSCC_RC_FAILURE;
+    }
+
+    /* Create flow control tasks if enabled*/
+    gl_mds_pro_ver = MDS_PROT_FCTRL;
+    mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id,
+        (uint64_t)optval, tipc_mcast_enabled);
[Vu] Should we add error handling here? What if this function returns failure?

[M]: If this function returns failure, the flag @is_fctrl_enabled is off, all mds_tipc_fctrl_xxx calls are disabled. I am not sure how error handling we can add here though, there's an error log inside the mds_tipc_fctrl_initialize() if any failure. But your comment has reveals that the @mbx_events is missed to release if any failure in create_ncs_task(). I will update that point, thanks!

and if this initialization is successful, then should mds_tipc_fctrl_shutdown() be called to clean up allocated resources?
[M]: mds_tipc_fctrl_shutdown() is called in mdtm_tipc_destroy()
[Vu2] My concern is for below code. Should we call mds_tipc_fctrl_shutdown() be called to clean up allocated resources in below case?

      mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, (uint64_t)optval,
                ackto, acksize, tipc_mcast_enabled);

    /* Create a task to receive the events and data */
    if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != NCSCC_RC_SUCCESS) {
        syslog(LOG_ERR,
               "MDTM:TIPC Receive Task Creation Failed in MDTM_INIT\n");
        close(tipc_cb.Dsock);
        close(tipc_cb.BSRsock);
        m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx, NULL);
[Vu2] --> call mds_tipc_fctrl_shutdown() here?
        return NCSCC_RC_FAILURE;
    }
+
      /* Create a task to receive the events and data */
      if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != NCSCC_RC_SUCCESS) {
          syslog(LOG_ERR,
@@ -469,7 +490,7 @@ uint32_t mdtm_tipc_destroy(void)
               NULL);
      m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx,
                (NCS_IPC_CB)mdtm_mailbox_mbx_cleanup);
-
+    mds_tipc_fctrl_shutdown();
      /* Clear reference hdl list */
      while (mdtm_ref_hdl_list_hdr != NULL) {
          /* Store temporary the pointer of entry to be deleted */
@@ -672,36 +693,26 @@ ssize_t recvfrom_connectionless(int sd, void *buf, size_t nbytes, int flags,
                                     0));
                      if (anc_data[0] == TIPC_ERR_OVERLOAD) {
                          LOG_ER(
-                            "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD", -                            ((struct sockaddr_tipc*) from)->addr.id.node, -                            ((struct sockaddr_tipc*) from)->addr.id.ref);
-                        m_MDS_LOG_CRITICAL(
                              "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD ",                               ((struct sockaddr_tipc*) from)->addr.id.node,                               ((struct sockaddr_tipc*) from)->addr.id.ref);
-                    } else {
+                    } else if (anc_data[0] == TIPC_ERR_NO_PORT){
                          /* TIPC_ERRINFO - TIPC error
                           * code associated with a
                           * returned data message or a
                           * connection termination
                           * message */
-                        m_MDS_LOG_DBG(
-                            "MDTM: undelivered message condition ancillary data: TIPC_ERRINFO err : %d",
-                            anc_data[0]);
+ mds_tipc_fctrl_portid_terminate(((struct sockaddr_tipc*)from)->addr.id);
+                    } else {
+                        m_MDS_LOG_ERR("MDTM:: TIPC_ERRINFO anc_data[0]:%u", anc_data[0]);
                      }
                  } else if (anc->cmsg_type == TIPC_RETDATA) {
-                    /* If we set TIPC_DEST_DROPPABLE off
-                       message (configure TIPC to return
-                       rejected messages to the sender ) we
-                       will hit this when we implement MDS
-                       retransmit lost messages, can be
-                       replaced with flow control logic */
                      /* TIPC_RETDATA -The contents of a
                       * returned data message */
-                    LOG_IN(
-                        "MDTM: undelivered message condition ancillary data: TIPC_RETDATA");
-                    m_MDS_LOG_INFO(
-                        "MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); +                    LOG_IN("MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); +                    uint16_t ret_msg_len = anc->cmsg_len - sizeof(*anc);
+                    unsigned char *ret_msg = CMSG_DATA(anc);
+                    mds_tipc_fctrl_drop_data(ret_msg, ret_msg_len, ((struct sockaddr_tipc*)from)->addr.id);
[Vu] What if this function returns failure?
[M] Nothing much we can do if it's failed (as it's failed to send ipc msg), but we can add a log error here
                  } else if (anc->cmsg_type == TIPC_DESTNAME) {
                      if (sz == 0) {
                          m_MDS_LOG_DBG(
@@ -822,7 +833,7 @@ static uint32_t mdtm_process_recv_events(void)
                                  m_MDS_LOG_INFO(
                                      "MDTM: Published: ");
                                  m_MDS_LOG_INFO(
-                                    "MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", +                                    "MDTM: <%x,%x,%x> port id <0x%08x:%u>\n",
                                      NTOHL(
                                      event.s
                                          .seq
@@ -841,7 +852,12 @@ static uint32_t mdtm_process_recv_events(void)
                                      event
                                          .port
                                          .ref));
-
+                                struct tipc_portid id = {
+                                    .node = NTOHL(event.port.node),
+                                    .ref = NTOHL(event.port.ref)
+                                };
+                                uint32_t type = NTOHL(event.s.seq.type);
+ mds_tipc_fctrl_portid_up(id, type);
                                  if (NCSCC_RC_SUCCESS !=
mdtm_process_discovery_events(
                                      TIPC_PUBLISHED,
@@ -873,7 +889,12 @@ static uint32_t mdtm_process_recv_events(void)
                                      event
                                          .port
                                          .ref));
-
+                                struct tipc_portid id = {
+                                    .node = NTOHL(event.port.node),
+                                    .ref = NTOHL(event.port.ref)
+                                };
+                                uint32_t type = NTOHL(event.s.seq.type);
+ mds_tipc_fctrl_portid_down(id, type);
                                  if (NCSCC_RC_SUCCESS !=
mdtm_process_discovery_events(
                                      TIPC_WITHDRAWN,
@@ -986,8 +1007,6 @@ static uint32_t mdtm_process_recv_events(void)
                          break;
                      }
                      if (recd_bytes == 0) {
-                        m_MDS_LOG_DBG(
-                            "MDTM: recd bytes=0 on received on sock, abnormal/unknown/hack  condition. Ignoring");
                          break;
                      }
                      data = inbuf;
@@ -1076,9 +1095,10 @@ static uint32_t mdtm_process_recv_events(void)
                                  &buff_dump);
                          }
  #else
-                        mdtm_process_recv_data(
-                            &inbuf[2], recd_bytes - 2,
-                            tipc_id, &buff_dump);
+                        if (mds_tipc_fctrl_rcv_data(inbuf, recd_bytes, client_addr.addr.id)
+                            == NCSCC_RC_SUCCESS) {
+ mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, &buff_dump);
+                        }
  #endif
                      } else {
                          uint64_t tipc_id;
@@ -1873,9 +1893,9 @@ uint32_t mds_mdtm_svc_install_tipc(PW_ENV_ID pwe_id, MDS_SVC_ID svc_id,
      server_addr.addr.nameseq.upper = server_inst;
        /* The self tipc random port number */
-    uint32_t port_id = 0;
+  struct tipc_portid port_id;
      get_tipc_port_id(tipc_cb.BSRsock, &port_id);
-    m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id, +    m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id.ref,
               server_type, server_inst);
        if (0 != bind(tipc_cb.BSRsock, (struct sockaddr *)&server_addr,
@@ -2495,6 +2515,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
       */
      uint32_t status = 0;
      uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
+  uint16_t fctrl_seq_num = 0;
      int version = req->msg_arch_word & 0x7;
      if (version > 1) {
          sum_mds_hdr_plus_mdtm_hdr_plus_len =
@@ -2583,11 +2604,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                  mdtm_add_mds_hdr(buffer_ack, req)) {
                  return NCSCC_RC_FAILURE;
              }
-
+          /* if sndqueue is capable, then obtain the current sending seq */ +          if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, &fctrl_seq_num)
+              == NCSCC_RC_FAILURE){
+            m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len);
+            return NCSCC_RC_FAILURE;
+          }
              /* Add frag_hdr */
              if (NCSCC_RC_SUCCESS !=
                  mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num,
-                          0)) {
+                          0, fctrl_seq_num)) {
                  return NCSCC_RC_FAILURE;
              }
  @@ -2676,13 +2702,22 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                      free(body);
                      return NCSCC_RC_FAILURE;
                  }
+                /* if sndqueue is capable, then obtain the current sending seq */
+                if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
+                    len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
+                    &fctrl_seq_num) == NCSCC_RC_FAILURE){
+                    m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d",
+                        len + sum_mds_hdr_plus_mdtm_hdr_plus_len);
+                    free(body);
[Vu] Is it required to call m_MMGR_FREE_BUFR_LIST(usrbuf) here too?
[M]: Yes, I update it.
+                    return NCSCC_RC_FAILURE;
+                }
                    if (NCSCC_RC_SUCCESS !=
                      mdtm_add_frag_hdr(
                      body,
                      (len +
                       sum_mds_hdr_plus_mdtm_hdr_plus_len),
-                    frag_seq_num, 0)) {
+                    frag_seq_num, 0, fctrl_seq_num)) {
                      m_MDS_LOG_ERR(
                          "MDTM: Unable to add the frag Hdr to the send msg\n");
                      m_MMGR_FREE_BUFR_LIST(usrbuf);
@@ -2778,12 +2813,23 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                      req->msg.data.buff_info.buff);
                  return NCSCC_RC_FAILURE;
              }
+            /* if sndqueue is capable, then obtain the current sending seq */
+            if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
+                req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
+                &fctrl_seq_num) == NCSCC_RC_FAILURE) {
+                m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", +                    req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len);
+                free(body);
+ mds_free_direct_buff(req->msg.data.buff_info.buff);
+                return NCSCC_RC_FAILURE;
+            }
+
              if (NCSCC_RC_SUCCESS !=
                  mdtm_add_frag_hdr(
                  body,
                  req->msg.data.buff_info.len +
                      sum_mds_hdr_plus_mdtm_hdr_plus_len,
-                frag_seq_num, 0)) {
+                frag_seq_num, 0, fctrl_seq_num)) {
                  m_MDS_LOG_ERR(
                      "MDTM: Unable to add the frag Hdr to the send msg\n");
                  free(body);
@@ -2860,6 +2906,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num,
      uint16_t i = 1;
      uint16_t frag_val = 0;
      uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
+    uint16_t fctrl_seq_num = 0;
      int version = req->msg_arch_word & 0x7;
      uint32_t ret = NCSCC_RC_SUCCESS;
  @@ -2938,9 +2985,18 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num,
                      return NCSCC_RC_FAILURE;
                  }
              }
+            /* if sndqueue is capable, then obtain the current sending seq */ +            if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, &fctrl_seq_num)
+                == NCSCC_RC_FAILURE) {
+                m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len_buf);
+                m_MMGR_FREE_BUFR_LIST(usrbuf);
+                free(body);
+                return NCSCC_RC_FAILURE;
+            }
+
              if (NCSCC_RC_SUCCESS !=
                  mdtm_add_frag_hdr(body, len_buf, seq_num,
-                          frag_val)) {
+                          frag_val, fctrl_seq_num)) {
                  m_MDS_LOG_ERR(
                      "MDTM: Frag hde addition failed\n");
                  m_MMGR_FREE_BUFR_LIST(usrbuf);
@@ -2996,7 +3052,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num,
  *********************************************************/
    uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
-               uint16_t frag_byte)
+               uint16_t frag_byte, uint16_t fctrl_seq_num)
  {
      /* Add the FRAG HDR to the Buffer */
      uint8_t *data;
@@ -3013,9 +3069,17 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
                      5); /* 2 bytes for keeping len, to cross
                         check at the receiver end */
  #else
-    ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN -
-                    2); /* 2 bytes for keeping len, to cross
-                       check at the receiver end */
+    /* Next 2 bytes for keeping len, to cross check at the receiver end.
+     * Used to be encoded as below:
+     *
+     * ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - 2);
+     *
+     * However, these 2 bytes have been never examined at receiver. As backward +     * compatibility, the fragment header and mds data header can't be extended, +     * hereafter, these 2 bytes will be used as sequence number in flow control
+     * (per tipc portid)
+     * */
+    ncs_encode_16bit(&data, fctrl_seq_num);
  #endif
      return NCSCC_RC_SUCCESS;
  }
@@ -3060,21 +3124,25 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t buff_len,
          buffer[4] = checksum;
      }
  #endif
-
-    send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
-              (struct sockaddr *)&server_addr, sizeof(server_addr));
-    if (send_len == buff_len) {
-        m_MDS_LOG_INFO("MDTM: Successfully sent message");
-        return NCSCC_RC_SUCCESS;
-    } else if (send_len == -1) {
-        m_MDS_LOG_ERR("MDTM: Failed to send message err :%s",
-                  strerror(errno));
-        return NCSCC_RC_FAILURE;
-    } else {
-        m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd",
-                  send_len);
-        return NCSCC_RC_FAILURE;
+    if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == NCSCC_RC_SUCCESS) {
+        send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
+                    (struct sockaddr *)&server_addr, sizeof(server_addr));
+        if (send_len == buff_len) {
+            m_MDS_LOG_INFO("MDTM: Successfully sent message");
+            return NCSCC_RC_SUCCESS;
+        } else if (send_len == -1) {
+            m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", strerror(errno)); +            // todo: it's failed to send msg, if flow control is enabled
+            // the msg needs to mark unsent
+            return NCSCC_RC_FAILURE;
+        } else {
+            m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd", send_len); +            // todo: it's failed to send msg, if flow control is enabled
+            // the msg needs to mark unsent
+            return NCSCC_RC_FAILURE;
+        }
      }
+    return NCSCC_RC_SUCCESS;
  }
/*********************************************************
@@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void *buffer, size_t size,
      server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE);
      /*This can be scope-down to dest_svc_id  server_inst TBD*/
      server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE);
-
      int send_len =
          sendto(tipc_cb.BSRsock, buffer, size, 0,
             (struct sockaddr *)&server_addr, sizeof(server_addr));
+
      if (send_len == size) {
-        m_MDS_LOG_INFO("MDTM: Successfully sent message");
+        m_MDS_LOG_INFO("MDTM: Successfully sent mcast message");
          return NCSCC_RC_SUCCESS;
      } else {
          m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err :%s", @@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t *buffer, MDTM_SEND_REQ *req)
      uint8_t *ptr;
      ptr = buffer;
  -    prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1));
+    prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1));
      enc_snd_type = (req->msg.encoding << 6);
      enc_snd_type |= (uint8_t)req->snd_type;
  diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
new file mode 100644
index 0000000..91b9107
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -0,0 +1,376 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+#include "mds/mds_tipc_fctrl_intf.h"
+
+#include <poll.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <sys/poll.h>
+#include <unistd.h>
+
+#include <map>
+#include <mutex>
+
+#include "base/ncssysf_def.h"
+#include "base/ncssysf_tsk.h"
+
+#include "mds/mds_log.h"
+#include "mds/mds_tipc_fctrl_portid.h"
+#include "mds/mds_tipc_fctrl_msg.h"
+
+using mds::Event;
+using mds::TipcPortId;
+using mds::DataMessage;
+using mds::ChunkAck;
+using mds::HeaderMessage;
+
+namespace {
+// multicast/broadcast enabled
+// todo: to be removed if flow control support it
+bool is_mcast_enabled = true;
+
+// mailbox handles for events
+SYSF_MBX mbx_events;
+
+// ncs task handle
+NCSCONTEXT p_task_hdl = nullptr;
+
+// data socket associated with port id
+int data_sock_fd = 0;
+struct tipc_portid snd_rcv_portid;
+
+// socket receiver's buffer size
+// todo: This buffer size is read from the local node as an estimated
+// buffer size of the receiver sides, in facts that could be different
+// at the receiver's sides (in the other nodes). At this moment, we
+// assume all nodes in cluster have set the same tipc buffer size
+uint64_t sock_buf_size = 0;
+
+// map of key:unique id(adest), value: TipcPortId instance
+// unique id is derived from struct tipc_portid
+std::map<uint64_t, TipcPortId*> portid_map;
+std::mutex portid_map_mutex;
+
+// chunk ack parameters
+// todo: The chunk ack size should be configurable
+uint16_t kChunkAckSize = 3;
+
+TipcPortId* portid_lookup(struct tipc_portid id) {
+  uint64_t uid = TipcPortId::GetUniqueId(id);
+  if (portid_map.find(uid) == portid_map.end()) return nullptr;
+  return portid_map[uid];
+}
+
+uint32_t process_flow_event(const Event evt) {
[Vu] should we make evt be a pointer or a reference parameter?
[M]: No, it's meant to be unchanged
[Vu2] i guess it can be passed with a const ptr/ref such as `uint32_t process_flow_event(const Event& evt) {`
+  uint32_t rc = NCSCC_RC_SUCCESS;
+  TipcPortId *portid = portid_lookup(evt.id_);
+  if (portid == nullptr) {
+    if (evt.type_ == Event::Type::kEvtRcvData) {
+      portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
+          sock_buf_size);
+      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+      if (evt.type_ == Event::Type::kEvtRcvData) {
[Vu] Is this a duplicated checking and so unnecessary?
[M] Yes, will remove one
+        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+            evt.fseq_, evt.svc_id_);
+      }
+    }
[Vu] Should log something in the else case; case 'portid == nullptr' && evt.type_ != Event::Type::kEvtRcvData? It could help to detect unwanted messages or any wrong in encode/decode processing.
[M] Yes, will add error logging here
+  } else {
+    if (evt.type_ == Event::Type::kEvtRcvData) {
+      rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+          evt.fseq_, evt.svc_id_);
+    }
+    if (evt.type_ == Event::Type::kEvtRcvChunkAck) {
+      portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_);
+    }
+    if (evt.type_ == Event::Type::kEvtSendChunkAck) {
+      portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_);
+    }
+    if (evt.type_ == Event::Type::kEvtDropData) {
+      portid->ReceiveNack(evt.mseq_, evt.mfrag_,
+          evt.fseq_);
+    }
+  }
+  return rc;
+}
+
+uint32_t process_all_events(void) {
+  enum { FD_FCTRL = 0, NUM_FDS };
+
+  int poll_tmo = MDTM_TIPC_POLL_TIMEOUT;
+  while (true) {
+    int pollres;
+    struct pollfd pfd[NUM_FDS] = {{0}};
+
+    pfd[FD_FCTRL].fd =
+        ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
+    pfd[FD_FCTRL].events = POLLIN;
+
+    pollres = poll(pfd, NUM_FDS, poll_tmo);
+
+    if (pollres == -1) {
+      if (errno == EINTR) continue;
+      m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno));
+      break;
+    }
+
+    if (pollres > 0) {
+      if (pfd[FD_FCTRL].revents == POLLIN) {
+        Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
+            &mbx_events));
+
+        if (evt == nullptr) continue;
+
+        portid_map_mutex.lock();
+        process_flow_event(*evt);
+        delete evt;
[Vu] Is it possible to move 'delete evt' inside the `process_flow_event`?
[M] It should do "delete evt" here as a scope of variable.
+        portid_map_mutex.unlock();
+      }
+    }
+  }  /* while */
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t create_ncs_task(void *task_hdl) {
+  int policy = SCHED_OTHER; /*root defaults */
+  int max_prio = sched_get_priority_max(policy);
+  int min_prio = sched_get_priority_min(policy);
+  int prio_val = ((max_prio - min_prio) * 0.87);
+
+  if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed");
+    return NCSCC_RC_FAILURE;
+  }
+  if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
+    return NCSCC_RC_FAILURE;
+  }
+  if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
+          "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
+          &task_hdl) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
+    return NCSCC_RC_FAILURE;
+  }
+
+  m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events");
+  return NCSCC_RC_SUCCESS;
+}
+
+}  // end local namespace
+
+uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
+    uint64_t rcv_buf_size, bool mcast_enabled) {
+  if (create_ncs_task(&p_task_hdl) !=
+      NCSCC_RC_SUCCESS) {
[Vu] Should check and free p_task_hdl if it has been allocated.
[M]: As in previous comment
+    m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
+    return NCSCC_RC_FAILURE;
+  }
+  data_sock_fd = dgramsock;
+  snd_rcv_portid = id;
+  sock_buf_size = rcv_buf_size;
+  is_mcast_enabled = mcast_enabled;
+
+  m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
+      id.node, id.ref);
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_shutdown(void) {
+  if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
+  }
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+          uint16_t* next_seq) {
[Vu] Should we use assert in every place to make sure the pre-condition/post-condition must be met?
e.g assert(next_seq)
[M] In general practice, the condition should be checked from a caller outside of mds. I think we should skip assert() here in mds internally as it's called many times during the data transmission
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  portid_map_mutex.lock();
+
+  TipcPortId *portid = portid_lookup(id);
+  if (portid == nullptr) {
+    m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
+        id.node, id.ref, __LINE__);
+    rc = NCSCC_RC_FAILURE;
+  } else {
+    // assign the sequence number of the outgoing message
+    *next_seq = portid->GetCurrentSeq();
+  }
+
+  portid_map_mutex.unlock();
+
+  return rc;
+}
+
+uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
+    struct tipc_portid id) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  portid_map_mutex.lock();
+
+  TipcPortId *portid = portid_lookup(id);
+  if (portid == nullptr) {
+    m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
+        id.node, id.ref, __LINE__);
[Vu] Missed to unlock the mutex here?
[M] It's not missed, it's called before the function returns
+    rc = NCSCC_RC_FAILURE;
+  } else {
+    portid->Queue(buffer, len);
+  }
+
+  portid_map_mutex.unlock();
+
+  return rc;
+}
+
+uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type) {
+  MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
+
+  portid_map_mutex.lock();
+
+  // Add this new tipc portid to the map
+  TipcPortId *portid = portid_lookup(id);
+  uint64_t uid = TipcPortId::GetUniqueId(id);
+  if (portid == nullptr) {
+    portid_map[uid] = portid = new TipcPortId(id, data_sock_fd,
+        kChunkAckSize, sock_buf_size);
+    m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u], svc_cnt:%u",
+        id.node, id.ref, svc_id, portid->svc_cnt_);
+  } else {
+    portid->svc_cnt_++;
+    m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u",
+        id.node, id.ref, svc_id, portid->svc_cnt_);
+  }
+
+  portid_map_mutex.unlock();
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type) {
+  MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
+
+  portid_map_mutex.lock();
+
+  // Delete this tipc portid out of the map
+  TipcPortId *portid = portid_lookup(id);
+  if (portid != nullptr) {
+    portid->svc_cnt_--;
+    m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u",
+        id.node, id.ref, svc_id, portid->svc_cnt_);
+  }
+  portid_map_mutex.unlock();
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) {
+  portid_map_mutex.lock();
+
+  // Delete this tipc portid out of the map
+  TipcPortId *portid = portid_lookup(id);
+  if (portid != nullptr) {
+    delete portid;
+    portid_map.erase(TipcPortId::GetUniqueId(id));
+    m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]", id.node, id.ref);
+  }
+
+  portid_map_mutex.unlock();
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id) {
+  HeaderMessage header;
+  header.Decode(buffer);
+  // if mds support flow control
+  if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+    if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
+      if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
+        // receive single ack message
+        ChunkAck ack;
+        ack.Decode(buffer);
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
+                header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_),
+            NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+          return NCSCC_RC_FAILURE;
+        }
+        return NCSCC_RC_SUCCESS;
+      }
+    } else {
+      // receive data message
+      DataMessage data;
+      data.Decode(buffer);
+      // send to the event thread
+      if (m_NCS_IPC_SEND(&mbx_events,
+          new Event(Event::Type::kEvtDropData, id, data.svc_id_,
+              header.mseq_, header.mfrag_, header.fseq_),
+          NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+        m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+        return NCSCC_RC_FAILURE;
+      }
+      return NCSCC_RC_SUCCESS;
+    }
+  }
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id) {
+  HeaderMessage header;
+  header.Decode(buffer);
+  // if mds support flow control
+  if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+    if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
+      if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
+        // receive single ack message
+        ChunkAck ack;
+        ack.Decode(buffer);
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
+                header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+        }
+        // return NCSCC_RC_FAILURE, so the tipc receiving thread (legacy) will
+        // skip this data msg
+        return NCSCC_RC_FAILURE;
+      }
+    } else {
+      // receive data message
+      DataMessage data;
+      data.Decode(buffer);
+      // todo: skip mcast/bcast, revisit
+      if ((data.snd_type_ == MDS_SENDTYPE_BCAST ||
+          data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) {
+        return NCSCC_RC_SUCCESS;
+      }
+      portid_map_mutex.lock();
[Vu] Should consider moving the lock/unlock inside 'process_flow_event'? and that function may take time to process. what if the mutex is acquired long? Any way to narrow the scope of the mutex acquired?
[M] The next patches that adds process_timer_event() would need the mutex too. It looks like the mutex is lowering down the performance of all tipc port id's transmission (as it protects all tipc port id). For now it is not doing much except examining the flow control sequence, and we still have the mds global mutex gl_mds_library_mutex.
+      uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData, +          id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_));
+      portid_map_mutex.unlock();
+      return rc;
+    }
+  }
+  return NCSCC_RC_SUCCESS;
+}
diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h
new file mode 100644
index 0000000..85a058f
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_intf.h
@@ -0,0 +1,47 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_
+#define MDS_MDS_TIPC_FCTRL_INTF_H_
+
+#include <linux/tipc.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
+    uint64_t rcv_buf_size, bool mbrcast_enabled);
+uint32_t mds_tipc_fctrl_shutdown(void);
+uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id);
+uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type); +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type);
+uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
+uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id);
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+    uint16_t* next_seq);
+uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
+    struct tipc_portid id);
+#ifdef __cplusplus
+}
+#endif
+
+#endif  // MDS_MDS_TIPC_FCTRL_INTF_H_
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
new file mode 100644
index 0000000..abd38d3
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -0,0 +1,142 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "mds/mds_tipc_fctrl_msg.h"
+#include "base/ncssysf_def.h"
+
+namespace mds {
+HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq,
+    uint16_t mfrag, uint16_t fseq): msg_len_(msg_len),
+        mseq_(mseq), mfrag_(mfrag), fseq_(fseq) {
+  pro_ver_ = MDS_PROT_FCTRL;
+  pro_id_ = 0;
+  msg_type_ = 0;
+}
+
+void HeaderMessage::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // encode message length
+  ptr = &msg[0];
[Vu] Should we make these constant index e.g: 0, 2, 6, 10, etc. into descriptive enummeric elements?
e.g:
enum kBitIndex {
    kMsgLengthIndex = 0,
    kSeqNumberIndex = 2,
    kFragmentIndex = 6,
    ...
};
[M]: Yes, I will add these enum
+  ncs_encode_16bit(&ptr, msg_len_);
+  // encode sequence number
+  ptr = &msg[2];
+  ncs_encode_32bit(&ptr, mseq_);
+  // encode sequence number
+  ptr = &msg[6];
+  ncs_encode_16bit(&ptr, mfrag_);
+  // skip length_check: oct8&9
+  // encode protocol version
+  ptr = &msg[10];
+  ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
+}
+
+void HeaderMessage::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode message length
+  ptr = &msg[0];
+  msg_len_ = ncs_decode_16bit(&ptr);
+  // decode sequence number
+  ptr = &msg[2];
+  mseq_ = ncs_decode_32bit(&ptr);
+  // decode fragment number
+  ptr = &msg[6];
+  mfrag_ = ncs_decode_16bit(&ptr);
+  // decode protocol version
+  ptr = &msg[10];
+  pro_ver_ = ncs_decode_8bit(&ptr);
+  if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+    // decode flow control sequence number
+    ptr = &msg[8];
+    fseq_ = ncs_decode_16bit(&ptr);
+    // decode protocol identifier
+    ptr = &msg[11];
+    pro_id_ = ncs_decode_32bit(&ptr);
+    if (pro_id_ == MDS_PROT_FCTRL_ID) {
+      // decode message type
+      ptr = &msg[15];
+      msg_type_ = ncs_decode_8bit(&ptr);
+    }
+  } else {
+    if (mfrag_ != 0) {
+      ptr = &msg[8];
+      fseq_ = ncs_decode_16bit(&ptr);
+      if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
+    }
+  }
+}
+
+void DataMessage::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode service id
+  ptr = &msg[MDTM_PKT_TYPE_OFFSET +
+             MDTM_FRAG_HDR_LEN +
+             MDS_HEADER_RCVR_SVC_ID_POSITION];
+  svc_id_ = ncs_decode_16bit(&ptr);
+  // decode snd_type
+  ptr = &msg[17];
+  snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
+}
+
+DataMessage::~DataMessage() {
+  if (msg_data_ != nullptr) {
+    delete msg_data_;
+    msg_data_ = nullptr;
+  }
+}
+
+ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size):
+    svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) {
+  msg_type_ = kChunkAckMsgType;
+}
+
+void ChunkAck::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[11];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[15];
+  ncs_encode_8bit(&ptr, kChunkAckMsgType);
+  // encode service id
+  ptr = &msg[16];
+  ncs_encode_16bit(&ptr, svc_id_);
+  // encode flow control sequence number
+  ptr = &msg[18];
+  ncs_encode_16bit(&ptr, acked_fseq_);
+  // encode chunk size
+  ptr = &msg[20];
+  ncs_encode_16bit(&ptr, chunk_size_);
+}
+
+void ChunkAck::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode service id
+  ptr = &msg[16];
+  svc_id_ = ncs_decode_16bit(&ptr);
+  // decode flow control sequence number
+  ptr = &msg[18];
+  acked_fseq_ = ncs_decode_16bit(&ptr);
+  // decode chunk size
+  ptr = &msg[20];
+  chunk_size_ = ncs_decode_16bit(&ptr);
+}
+
+}  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
new file mode 100644
index 0000000..677f256
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -0,0 +1,129 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_
+#define MDS_MDS_TIPC_FCTRL_MSG_H_
+
+#include <linux/tipc.h>
+#include "base/ncssysf_ipc.h"
+#include "base/ncssysf_tmr.h"
+#include "mds/mds_dt.h"
+
+namespace mds {
+
+class Event {
+ public:
+  enum class Type {
+    kEvtPortIdAll = 0,
+    kEvtPortIdUp,     // event of new portid published (not used)
+    kEvtPortIdDown,   // event of portid widthdrawn (not used)
+
+    kEvtDataFlowAll,
+    kEvtRcvData,       // event that received data msg from peer
+    kEvtSendChunkAck,  // event to send the ack for a chunk of N accumulative
+                       // data msgs (N>=1)
+    kEvtRcvChunkAck,   // event that received the ack for a chunk of N
+                       // accumulative data msgs (N>=1)
+    kEvtSendSelectiveAck,  // event to send the ack for a number of selective
+                           // data msgs (not supported)
+    kEvtRcvSelectiveAck,   // event that received the ack for a numer of
+                           // selective data msgs (not supported)
+    kEvtDropData,          // event reported from tipc that a message is not
+                           // delivered
+  };
+  NCS_IPC_MSG next_{0};
+  Type type_;
+
+  // Used for flow event only
+  struct tipc_portid id_;
+  uint16_t svc_id_{0};
+  uint32_t mseq_{0};
+  uint16_t mfrag_{0};
+  uint16_t fseq_{0};
+  uint16_t chunk_size_{1};
+  explicit Event(Type type):type_(type) {}
+  Event(Type type, struct tipc_portid id, uint16_t svc_id,
+      uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num):
+    id_(id), svc_id_(svc_id),
+    mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) {
+    type_ = type;
+  }
+  Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq,
+      uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size):
+    id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag),
+    fseq_(f_seg_num), chunk_size_(chunk_size) {
+    type_ = type;
+  }
+};
+
+class BaseMessage {
+ public:
+  virtual ~BaseMessage() {}
+  virtual void Decode(uint8_t *msg) {}
+  virtual void Encode(uint8_t *msg) {}
+};
+
+class HeaderMessage: public BaseMessage {
+ public:
+  uint8_t* msg_ptr_{nullptr};
+  uint16_t msg_len_{0};
+  uint32_t mseq_{0};
+  uint16_t mfrag_{0};
+  uint16_t fseq_{0};
+  uint8_t pro_ver_{0};
+  uint32_t pro_id_{0};
+  uint16_t msg_type_{0};
+  HeaderMessage() {}
+  HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag,
+      uint16_t fseq);
+  virtual ~HeaderMessage() {}
+  void Decode(uint8_t *msg) override;
+  void Encode(uint8_t *msg) override;
+};
+
+class DataMessage: public BaseMessage {
+ public:
+  HeaderMessage header_;
+  uint16_t svc_id_{0};
+
+  uint8_t* msg_data_{nullptr};
+  uint8_t snd_type_{0};
+
+  DataMessage() {}
+  virtual ~DataMessage();
+  void Decode(uint8_t *msg) override;
+};
+
+class ChunkAck: public BaseMessage {
+ public:
+  static const uint8_t kChunkAckMsgType = 1;
+  static const uint16_t kChunkAckMsgLength = 22;
+
+  uint8_t msg_type_{0};
+  uint16_t svc_id_{0};
+  uint16_t acked_fseq_{0};
+  uint16_t chunk_size_{1};
+  ChunkAck() {}
+  ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size);
+  virtual ~ChunkAck() {}
+  void Encode(uint8_t *msg) override;
+  void Decode(uint8_t *msg) override;
+};
+
+}  // end namespace mds
+
+#endif  // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
new file mode 100644
index 0000000..24d13ee
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -0,0 +1,261 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "mds/mds_tipc_fctrl_portid.h"
+#include "base/ncssysf_def.h"
+
+#include "mds/mds_dt.h"
+#include "mds/mds_log.h"
+
+namespace mds {
+
+void MessageQueue::Queue(DataMessage* msg) {
[Vu] should make msg to be an constant pointer parameter?
[M]: The msg has additional field is_sent to be changed during the transmission in next patches
+  queue_.push_back(msg);
+}
+
+DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
+  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
[Vu] should use shorter version (const auto& it: queue_) ?
[M]: Yes, will update it
+    DataMessage *m = *it;
+    if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
+      return m;
+    }
+  }
+  return nullptr;
+}
+
+uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+  uint64_t msg_len = 0;
+  for (auto it = queue_.begin(); it != queue_.end();) {
+    DataMessage *m = *it;
+    if (fseq_from <= m->header_.fseq_ &&
+        m->header_.fseq_ <= fseq_to) {
+      msg_len += m->header_.msg_len_;
+      it = queue_.erase(it);
+      delete m;
+    } else {
+      it++;
+    }
+  }
+  return msg_len;
+}
+
+void MessageQueue::Clear() {
+  while (queue_.empty() == false) {
+    DataMessage* msg = queue_.front();
+    queue_.pop_front();
+    delete msg;
+  }
+}
+
+TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize,
+    uint64_t sock_buf_size):
+  id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) {
+}
+
+TipcPortId::~TipcPortId() {
+  // clear all msg in sndqueue_
+  sndqueue_.Clear();
+}
+
+uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
+  // this uid is equivalent to the mds adest
+  uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref;
+  return uid;
+}
+
+uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
+  struct sockaddr_tipc server_addr;
+  ssize_t send_len = 0;
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  memset(&server_addr, 0, sizeof(server_addr));
+  server_addr.family = AF_TIPC;
+  server_addr.addrtype = TIPC_ADDR_ID;
+  server_addr.addr.id = id_;
+  send_len = sendto(bsrsock_, data, length, 0,
+        (struct sockaddr *)&server_addr, sizeof(server_addr));
+
+  if (send_len == length) {
+    rc = NCSCC_RC_SUCCESS;
+  } else {
+    m_MDS_LOG_ERR("sendto() err :%s", strerror(errno));
+    rc = NCSCC_RC_FAILURE;
+  }
+  return rc;
+}
+
+uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  DataMessage *msg = new DataMessage;
+  msg->header_.Decode(const_cast<uint8_t*>(data));
+  msg->Decode(const_cast<uint8_t*>(data));
+  msg->msg_data_ = new uint8_t[length];
+  memcpy(msg->msg_data_, data, length);
+  sndqueue_.Queue(msg);
+  ++sndwnd_.send_;
+  sndwnd_.nacked_space_ += length;
+  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+      "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+      id_.node, id_.ref,
+      msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
+      sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+
+  return rc;
+}
+
+bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
+  return true;
+}
+
+void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
+    uint16_t chksize) {
+  uint8_t data[ChunkAck::kChunkAckMsgLength];
+
+  HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq);
+ header.Encode(reinterpret_cast<uint8_t*>(&data));
+
+  ChunkAck sack(svc_id, fseq, chksize);
+  sack.Encode(reinterpret_cast<uint8_t*>(&data));
+  Send(reinterpret_cast<uint8_t*>(&data), ChunkAck::kChunkAckMsgLength);
+  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndChkAck[fseq:%u, chunk:%u]",
+      id_.node, id_.ref,
+      fseq, chksize);
+}
+
+uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
+    uint16_t fseq, uint16_t svc_id) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+  // update receiver sequence window
+  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+        "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        mseq, mfrag, fseq,
+        rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+
+    ++rcvwnd_.rcv_;
+    if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
+      // send ack for @chunk_size_ msgs starting from fseq
+      SendChunkAck(fseq, svc_id, chunk_size_);
+      rcvwnd_.acked_ = rcvwnd_.rcv_;
+    }
+  } else {
+    // todo: update rcvwnd_.nacked_space_.
+    // This nacked_space_ will tell the number of bytes that has not been acked +    // to the sender. If this nacked_space_ is growing large, and approaching +    // the socket buffer size, the transmission of sender may be queued. +    // this nacked_space_ can be used to detect if the kChunkAckTimeout or
+    // kChunkAckSize are set excessively large.
+    // It is not used for now, so ignore it.
+
+    // check for transmission error
+    if (rcvwnd_.rcv_ + 1 < fseq) {
+      if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
+        // peer does not realize that this portid reset
+        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+            "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+            "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+            "Warning[portid reset]",
+            id_.node, id_.ref,
+            mseq, mfrag, fseq,
+            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+
+        rcvwnd_.rcv_ = fseq;
+      } else {
+        rc = NCSCC_RC_FAILURE;
+        // msg loss
+        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+            "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+            "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+            "Error[msg loss]",
+            id_.node, id_.ref,
+            mseq, mfrag, fseq,
+            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+      }
+    }
+    if (fseq <= rcvwnd_.acked_) {
+      rc = NCSCC_RC_FAILURE;
+      // unexpected retransmission
+      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+          "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+          "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+          "Error[unexpected retransmission]",
+          id_.node, id_.ref,
+          mseq, mfrag, fseq,
+          rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+    }
+  }
+  return rc;
+}
+
+void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
+  // update sender sequence window
+  if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvChkAck[fseq:%u, chunk:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+        "queue[size:%" PRIu64 "]",
+        id_.node, id_.ref,
+        fseq, chksize,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndqueue_.Size());
+
+    // fast forward the sndwnd_.acked_ sequence to fseq
+    sndwnd_.acked_ = fseq;
+
+    // remove a number @chksize messages out of sndqueue_ and decrease
+    // the nacked_space_ of sender
+    sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq);
+  } else {
+    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvChkAck[fseq:%u, chunk:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+        "queue[size:%" PRIu64 "], "
+        "Error[msg disordered]",
+        id_.node, id_.ref,
+        fseq, chksize,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndqueue_.Size());
+  }
+}
+
+void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
+    uint16_t fseq) {
+  DataMessage* msg = sndqueue_.Find(mseq, mfrag);
+  if (msg != nullptr) {
+    // Resend the msg found
+    Send(msg->msg_data_, msg->header_.msg_len_);
+    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+        "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        mseq, mfrag, fseq,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  } else {
+    m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
+        "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
+        "Error[msg not found]",
+        id_.node, id_.ref,
+        mseq, mfrag, fseq);
+  }
+}
+
+}  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
new file mode 100644
index 0000000..8068e6e
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -0,0 +1,87 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_
+#define MDS_MDS_TIPC_FCTRL_PORTID_H_
+
+#include <linux/tipc.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <deque>
+#include "mds/mds_tipc_fctrl_msg.h"
+
+namespace mds {
+
+class MessageQueue {
+ public:
+  void Queue(DataMessage* msg);
+  DataMessage* Find(uint32_t mseq, uint16_t mfrag);
+  uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
+  uint64_t Size() const { return queue_.size(); }
+  void Clear();
+ private:
+  std::deque<DataMessage*> queue_;
+};
+
+class TipcPortId {
+ public:
+  TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
+      uint64_t sock_buf_size);
+  ~TipcPortId();
+  static uint64_t GetUniqueId(struct tipc_portid id);
+  int GetSock() const { return bsrsock_; }
+  uint16_t GetCurrentSeq() { return sndwnd_.send_; }
+  bool ReceiveCapable(uint16_t sending_len);
+  void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
+  void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
+  uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
+      uint16_t fseq, uint16_t svc_id);
+  void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
+  uint32_t Send(uint8_t* data, uint16_t length);
+  uint32_t Queue(const uint8_t* data, uint16_t length);
+
+  uint16_t svc_cnt_{1};  // number of service subscribed on this portid
+
+ private:
+  struct tipc_portid id_;
+  int bsrsock_;  // tipc socket to send/receive data per tipc_portid
+  uint16_t chunk_size_{5};
+  uint64_t rcv_buf_size_{0};  // estimated buffer size at receiver
+
+  struct sndwnd {
+    // sender sequence window
+    uint16_t acked_{0};  // last sequence has been acked by receiver
+    uint16_t send_{1};   // next sequence to be sent
+    uint64_t nacked_space_{0};  // total bytes are sent but not acked
+  };
+  struct sndwnd sndwnd_;
+
+  struct rcvwnd {
+    // receiver sequence window
+    uint16_t acked_{0};  // last sequence has been acked to sender
+    uint16_t rcv_{0};    // last sequence has been received
+    uint64_t nacked_space_{0};  // total bytes has not been acked
+  };
+  struct rcvwnd rcvwnd_;
+
+  MessageQueue sndqueue_;
+};
+
+}  // end namespace mds
+#endif  // MDS_MDS_TIPC_FCTRL_PORTID_H_






_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to