This is a collaborative patch of two participants:Thuan, Minh.

Main changes:
- Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files
introduce new functions which are called in mds_dt_tipc.c if the flow
control is enabled
- Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files
implements the tipc portid instance, which supports the sliding window,
mds msg queue
- Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define
the event and messages which are used for this solution.
---
 src/mds/Makefile.am              |  10 +-
 src/mds/mds_dt.h                 |   8 +-
 src/mds/mds_dt_tipc.c            | 188 +++++++++++++-------
 src/mds/mds_tipc_fctrl_intf.cc   | 376 +++++++++++++++++++++++++++++++++++++++
 src/mds/mds_tipc_fctrl_intf.h    |  47 +++++
 src/mds/mds_tipc_fctrl_msg.cc    | 142 +++++++++++++++
 src/mds/mds_tipc_fctrl_msg.h     | 129 ++++++++++++++
 src/mds/mds_tipc_fctrl_portid.cc | 261 +++++++++++++++++++++++++++
 src/mds/mds_tipc_fctrl_portid.h  |  87 +++++++++
 9 files changed, 1184 insertions(+), 64 deletions(-)
 create mode 100644 src/mds/mds_tipc_fctrl_intf.cc
 create mode 100644 src/mds/mds_tipc_fctrl_intf.h
 create mode 100644 src/mds/mds_tipc_fctrl_msg.cc
 create mode 100644 src/mds/mds_tipc_fctrl_msg.h
 create mode 100644 src/mds/mds_tipc_fctrl_portid.cc
 create mode 100644 src/mds/mds_tipc_fctrl_portid.h

diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am
index 2d7b652..d849e8f 100644
--- a/src/mds/Makefile.am
+++ b/src/mds/Makefile.am
@@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \
 if ENABLE_TIPC_TRANSPORT
 noinst_HEADERS += src/mds/mds_dt_tipc.h \
        src/mds/mds_tipc_recvq_stats.h \
-       src/mds/mds_tipc_recvq_stats_impl.h
+       src/mds/mds_tipc_recvq_stats_impl.h \
+       src/mds/mds_tipc_fctrl_intf.h \
+       src/mds/mds_tipc_fctrl_portid.h \
+       src/mds/mds_tipc_fctrl_msg.h
 lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \
        src/mds/mds_tipc_recvq_stats.cc \
-       src/mds/mds_tipc_recvq_stats_impl.cc
+       src/mds/mds_tipc_recvq_stats_impl.cc \
+       src/mds/mds_tipc_fctrl_intf.cc \
+       src/mds/mds_tipc_fctrl_portid.cc \
+       src/mds/mds_tipc_fctrl_msg.cc
 endif
 
 if ENABLE_TESTS
diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h
index b645bb4..d9e8633 100644
--- a/src/mds/mds_dt.h
+++ b/src/mds/mds_dt.h
@@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL ref);
 uint32_t mds_tmr_mailbox_processing(void);
 uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL *svc_hdl);
 uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
-                           uint16_t frag_byte);
+                           uint16_t frag_byte, uint16_t fctrl_seq_num);
 uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg);
 uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, uint64_t tipc_id,
                                 uint32_t *buff_dump);
@@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, NCSCONTEXT 
msg);
 
 #define MDS_PROT 0xA0
 #define MDS_VERSION 0x08
-#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION)
+#define MDS_PROT_VER_MASK 0xFC
 #define MDTM_PRI_MASK 0x3
 
+/* MDS protocol/version for flow control */
+#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION)
+#define MDS_PROT_FCTRL_ID 0x00AC13F5
+
 /* Added for the subscription changes */
 #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff0000)
 #define MDS_TIPC_COMMON_ID 0x01001000
diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
index 86b52bb..fef1c50 100644
--- a/src/mds/mds_dt_tipc.c
+++ b/src/mds/mds_dt_tipc.c
@@ -47,6 +47,7 @@
 #include "mds_dt_tipc.h"
 #include "mds_dt_tcp_disc.h"
 #include "mds_core.h"
+#include "mds_tipc_fctrl_intf.h"
 #include "mds_tipc_recvq_stats.h"
 #include "base/osaf_utility.h"
 #include "base/osaf_poll.h"
@@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list;
 uint32_t mdtm_global_frag_num;
 
 const unsigned int MAX_RECV_THRESHOLD = 30;
+uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION;
 
-static bool get_tipc_port_id(int sock, uint32_t* port_id) {
+static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) {
        struct sockaddr_tipc addr;
        socklen_t sz = sizeof(addr);
 
        memset(&addr, 0, sizeof(addr));
-       *port_id = 0;
+       port_id->node = 0;
+       port_id->ref = 0;
        if (0 > getsockname(sock, (struct sockaddr *)&addr, &sz)) {
                syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, err: %s",
                       strerror(errno));
                return false;
        }
 
-       *port_id = addr.addr.id.ref;
+       *port_id = addr.addr.id;
        return true;
 }
 
@@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t 
*mds_tipc_ref)
        }
 
        /* Code for getting the self tipc random number */
-       if (!get_tipc_port_id(tipc_cb.BSRsock, mds_tipc_ref)) {
+       struct tipc_portid port_id;
+       if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) {
                close(tipc_cb.Dsock);
                close(tipc_cb.BSRsock);
                return NCSCC_RC_FAILURE;
        }
-
+       *mds_tipc_ref = port_id.ref;
        tipc_cb.adest = ((uint64_t)(nodeid)) << 32;
        tipc_cb.adest |= *mds_tipc_ref;
        tipc_cb.node_id = nodeid;
@@ -325,6 +329,23 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t 
*mds_tipc_ref)
                mdtm_set_transport(MDTM_TX_TYPE_TIPC);
        }
 
+       /* Get tipc socket receive buffer size */
+       int optval;
+       socklen_t optlen = sizeof(optval);
+       if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF,
+               &optval, &optlen) != 0) {
+               syslog(LOG_ERR, "MDTM: getsockopt() failed to get rcv buf size 
to: %str",
+                       strerror(errno));
+               close(tipc_cb.Dsock);
+               close(tipc_cb.BSRsock);
+               return NCSCC_RC_FAILURE;
+       }
+
+       /* Create flow control tasks if enabled*/
+       gl_mds_pro_ver = MDS_PROT_FCTRL;
+       mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id,
+               (uint64_t)optval, tipc_mcast_enabled);
+
        /* Create a task to receive the events and data */
        if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != NCSCC_RC_SUCCESS) {
                syslog(LOG_ERR,
@@ -469,7 +490,7 @@ uint32_t mdtm_tipc_destroy(void)
                         NULL);
        m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx,
                          (NCS_IPC_CB)mdtm_mailbox_mbx_cleanup);
-
+       mds_tipc_fctrl_shutdown();
        /* Clear reference hdl list */
        while (mdtm_ref_hdl_list_hdr != NULL) {
                /* Store temporary the pointer of entry to be deleted */
@@ -672,36 +693,26 @@ ssize_t recvfrom_connectionless(int sd, void *buf, size_t 
nbytes, int flags,
                                                               0));
                                        if (anc_data[0] == TIPC_ERR_OVERLOAD) {
                                                LOG_ER(
-                                                   "MDTM: From <0x%"PRIx32 
":%"PRIu32 "> undeliverable message condition ancillary data: 
TIPC_ERR_OVERLOAD",
-                                                   ((struct sockaddr_tipc*) 
from)->addr.id.node,
-                                                   ((struct sockaddr_tipc*) 
from)->addr.id.ref);
-                                               m_MDS_LOG_CRITICAL(
                                                    "MDTM: From <0x%"PRIx32 
":%"PRIu32 "> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD 
",
                                                    ((struct sockaddr_tipc*) 
from)->addr.id.node,
                                                    ((struct sockaddr_tipc*) 
from)->addr.id.ref);
-                                       } else {
+                                       } else if (anc_data[0] == 
TIPC_ERR_NO_PORT){
                                                /* TIPC_ERRINFO - TIPC error
                                                 * code associated with a
                                                 * returned data message or a
                                                 * connection termination
                                                 * message */
-                                               m_MDS_LOG_DBG(
-                                                   "MDTM: undelivered message 
condition ancillary data: TIPC_ERRINFO err : %d",
-                                                   anc_data[0]);
+                                               
mds_tipc_fctrl_portid_terminate(((struct sockaddr_tipc*)from)->addr.id);
+                                       } else {
+                                               m_MDS_LOG_ERR("MDTM:: 
TIPC_ERRINFO anc_data[0]:%u", anc_data[0]);
                                        }
                                } else if (anc->cmsg_type == TIPC_RETDATA) {
-                                       /* If we set TIPC_DEST_DROPPABLE off
-                                          message (configure TIPC to return
-                                          rejected messages to the sender ) we
-                                          will hit this when we implement MDS
-                                          retransmit lost messages, can be
-                                          replaced with flow control logic */
                                        /* TIPC_RETDATA -The contents of a
                                         * returned data message */
-                                       LOG_IN(
-                                           "MDTM: undelivered message 
condition ancillary data: TIPC_RETDATA");
-                                       m_MDS_LOG_INFO(
-                                           "MDTM: undelivered message 
condition ancillary data: TIPC_RETDATA");
+                                       LOG_IN("MDTM: undelivered message 
condition ancillary data: TIPC_RETDATA");
+                                       uint16_t ret_msg_len = anc->cmsg_len - 
sizeof(*anc);
+                                       unsigned char *ret_msg = CMSG_DATA(anc);
+                                       mds_tipc_fctrl_drop_data(ret_msg, 
ret_msg_len, ((struct sockaddr_tipc*)from)->addr.id);
                                } else if (anc->cmsg_type == TIPC_DESTNAME) {
                                        if (sz == 0) {
                                                m_MDS_LOG_DBG(
@@ -822,7 +833,7 @@ static uint32_t mdtm_process_recv_events(void)
                                                                m_MDS_LOG_INFO(
                                                                    "MDTM: 
Published: ");
                                                                m_MDS_LOG_INFO(
-                                                                   "MDTM:  
<%u,%u,%u> port id <0x%08x:%u>\n",
+                                                                   "MDTM:  
<%x,%x,%x> port id <0x%08x:%u>\n",
                                                                    NTOHL(
                                                                        event.s
                                                                            .seq
@@ -841,7 +852,12 @@ static uint32_t mdtm_process_recv_events(void)
                                                                        event
                                                                            
.port
                                                                            
.ref));
-
+                                                               struct 
tipc_portid id = {
+                                                                       .node = 
NTOHL(event.port.node),
+                                                                       .ref = 
NTOHL(event.port.ref)
+                                                               };
+                                                               uint32_t type = 
NTOHL(event.s.seq.type);
+                                                               
mds_tipc_fctrl_portid_up(id, type);
                                                                if 
(NCSCC_RC_SUCCESS !=
                                                                    
mdtm_process_discovery_events(
                                                                        
TIPC_PUBLISHED,
@@ -873,7 +889,12 @@ static uint32_t mdtm_process_recv_events(void)
                                                                        event
                                                                            
.port
                                                                            
.ref));
-
+                                                               struct 
tipc_portid id = {
+                                                                       .node = 
NTOHL(event.port.node),
+                                                                       .ref = 
NTOHL(event.port.ref)
+                                                               };
+                                                               uint32_t type = 
NTOHL(event.s.seq.type);
+                                                               
mds_tipc_fctrl_portid_down(id, type);
                                                                if 
(NCSCC_RC_SUCCESS !=
                                                                    
mdtm_process_discovery_events(
                                                                        
TIPC_WITHDRAWN,
@@ -986,8 +1007,6 @@ static uint32_t mdtm_process_recv_events(void)
                                                break;
                                        }
                                        if (recd_bytes == 0) {
-                                               m_MDS_LOG_DBG(
-                                                   "MDTM: recd bytes=0 on 
received on sock, abnormal/unknown/hack  condition. Ignoring");
                                                break;
                                        }
                                        data = inbuf;
@@ -1076,9 +1095,10 @@ static uint32_t mdtm_process_recv_events(void)
                                                            &buff_dump);
                                                }
 #else
-                                               mdtm_process_recv_data(
-                                                   &inbuf[2], recd_bytes - 2,
-                                                   tipc_id, &buff_dump);
+                                               if 
(mds_tipc_fctrl_rcv_data(inbuf, recd_bytes, client_addr.addr.id)
+                                                   == NCSCC_RC_SUCCESS) {
+                                                       
mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, &buff_dump);
+                                               }
 #endif
                                        } else {
                                                uint64_t tipc_id;
@@ -1873,9 +1893,9 @@ uint32_t mds_mdtm_svc_install_tipc(PW_ENV_ID pwe_id, 
MDS_SVC_ID svc_id,
        server_addr.addr.nameseq.upper = server_inst;
 
        /* The self tipc random port number */
-       uint32_t port_id = 0;
+  struct tipc_portid port_id;
        get_tipc_port_id(tipc_cb.BSRsock, &port_id);
-       m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id,
+       m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id.ref,
                         server_type, server_inst);
 
        if (0 != bind(tipc_cb.BSRsock, (struct sockaddr *)&server_addr,
@@ -2495,6 +2515,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
         */
        uint32_t status = 0;
        uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
+  uint16_t fctrl_seq_num = 0;
        int version = req->msg_arch_word & 0x7;
        if (version > 1) {
                sum_mds_hdr_plus_mdtm_hdr_plus_len =
@@ -2583,11 +2604,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                            mdtm_add_mds_hdr(buffer_ack, req)) {
                                return NCSCC_RC_FAILURE;
                        }
-
+                 /* if sndqueue is capable, then obtain the current sending 
seq */
+                 if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, 
&fctrl_seq_num)
+                     == NCSCC_RC_FAILURE){
+                   m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len);
+                   return NCSCC_RC_FAILURE;
+                 }
                        /* Add frag_hdr */
                        if (NCSCC_RC_SUCCESS !=
                            mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num,
-                                             0)) {
+                                             0, fctrl_seq_num)) {
                                return NCSCC_RC_FAILURE;
                        }
 
@@ -2676,13 +2702,22 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                                        free(body);
                                        return NCSCC_RC_FAILURE;
                                }
+                               /* if sndqueue is capable, then obtain the 
current sending seq */
+                               if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
+                                       len + 
sum_mds_hdr_plus_mdtm_hdr_plus_len,
+                                       &fctrl_seq_num) == NCSCC_RC_FAILURE){
+                                       m_MDS_LOG_ERR("FCTRL: Failed to send 
message len :%d",
+                                               len + 
sum_mds_hdr_plus_mdtm_hdr_plus_len);
+                                       free(body);
+                                       return NCSCC_RC_FAILURE;
+                               }
 
                                if (NCSCC_RC_SUCCESS !=
                                    mdtm_add_frag_hdr(
                                        body,
                                        (len +
                                         sum_mds_hdr_plus_mdtm_hdr_plus_len),
-                                       frag_seq_num, 0)) {
+                                       frag_seq_num, 0, fctrl_seq_num)) {
                                        m_MDS_LOG_ERR(
                                            "MDTM: Unable to add the frag Hdr 
to the send msg\n");
                                        m_MMGR_FREE_BUFR_LIST(usrbuf);
@@ -2778,12 +2813,23 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
                                    req->msg.data.buff_info.buff);
                                return NCSCC_RC_FAILURE;
                        }
+                       /* if sndqueue is capable, then obtain the current 
sending seq */
+                       if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
+                               req->msg.data.buff_info.len + 
sum_mds_hdr_plus_mdtm_hdr_plus_len,
+                               &fctrl_seq_num) == NCSCC_RC_FAILURE) {
+                               m_MDS_LOG_ERR("FCTRL: Failed to send message 
len :%d",
+                                       req->msg.data.buff_info.len + 
sum_mds_hdr_plus_mdtm_hdr_plus_len);
+                               free(body);
+                               
mds_free_direct_buff(req->msg.data.buff_info.buff);
+                               return NCSCC_RC_FAILURE;
+                       }
+
                        if (NCSCC_RC_SUCCESS !=
                            mdtm_add_frag_hdr(
                                body,
                                req->msg.data.buff_info.len +
                                    sum_mds_hdr_plus_mdtm_hdr_plus_len,
-                               frag_seq_num, 0)) {
+                               frag_seq_num, 0, fctrl_seq_num)) {
                                m_MDS_LOG_ERR(
                                    "MDTM: Unable to add the frag Hdr to the 
send msg\n");
                                free(body);
@@ -2860,6 +2906,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t 
seq_num,
        uint16_t i = 1;
        uint16_t frag_val = 0;
        uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
+       uint16_t fctrl_seq_num = 0;
        int version = req->msg_arch_word & 0x7;
        uint32_t ret = NCSCC_RC_SUCCESS;
 
@@ -2938,9 +2985,18 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t 
seq_num,
                                        return NCSCC_RC_FAILURE;
                                }
                        }
+                       /* if sndqueue is capable, then obtain the current 
sending seq */
+                       if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, 
&fctrl_seq_num)
+                               == NCSCC_RC_FAILURE) {
+                               m_MDS_LOG_ERR("FCTRL: Failed to send message 
len :%d", len_buf);
+                               m_MMGR_FREE_BUFR_LIST(usrbuf);
+                               free(body);
+                               return NCSCC_RC_FAILURE;
+                       }
+
                        if (NCSCC_RC_SUCCESS !=
                            mdtm_add_frag_hdr(body, len_buf, seq_num,
-                                             frag_val)) {
+                                             frag_val, fctrl_seq_num)) {
                                m_MDS_LOG_ERR(
                                    "MDTM: Frag hde addition failed\n");
                                m_MMGR_FREE_BUFR_LIST(usrbuf);
@@ -2996,7 +3052,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t 
seq_num,
 *********************************************************/
 
 uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num,
-                          uint16_t frag_byte)
+                          uint16_t frag_byte, uint16_t fctrl_seq_num)
 {
        /* Add the FRAG HDR to the Buffer */
        uint8_t *data;
@@ -3013,9 +3069,17 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t 
len, uint32_t seq_num,
                                    5); /* 2 bytes for keeping len, to cross
                                           check at the receiver end */
 #else
-       ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN -
-                                   2); /* 2 bytes for keeping len, to cross
-                                          check at the receiver end */
+       /* Next 2 bytes for keeping len, to cross check at the receiver end.
+        * Used to be encoded as below:
+        *
+        * ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - 2);
+        *
+        * However, these 2 bytes have been never examined at receiver. As 
backward
+        * compatibility, the fragment header and mds data header can't be 
extended,
+        * hereafter, these 2 bytes will be used as sequence number in flow 
control
+        * (per tipc portid)
+        * */
+       ncs_encode_16bit(&data, fctrl_seq_num);
 #endif
        return NCSCC_RC_SUCCESS;
 }
@@ -3060,21 +3124,25 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t 
buff_len,
                buffer[4] = checksum;
        }
 #endif
-
-       send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
-                         (struct sockaddr *)&server_addr, sizeof(server_addr));
-       if (send_len == buff_len) {
-               m_MDS_LOG_INFO("MDTM: Successfully sent message");
-               return NCSCC_RC_SUCCESS;
-       } else if (send_len == -1) {
-               m_MDS_LOG_ERR("MDTM: Failed to send message err :%s",
-                             strerror(errno));
-               return NCSCC_RC_FAILURE;
-       } else {
-               m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd",
-                             send_len);
-               return NCSCC_RC_FAILURE;
+       if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == NCSCC_RC_SUCCESS) {
+               send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
+                                       (struct sockaddr *)&server_addr, 
sizeof(server_addr));
+               if (send_len == buff_len) {
+                       m_MDS_LOG_INFO("MDTM: Successfully sent message");
+                       return NCSCC_RC_SUCCESS;
+               } else if (send_len == -1) {
+                       m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", 
strerror(errno));
+                       // todo: it's failed to send msg, if flow control is 
enabled
+                       // the msg needs to mark unsent
+                       return NCSCC_RC_FAILURE;
+               } else {
+                       m_MDS_LOG_ERR("MDTM: Failed to send message send_len 
:%zd", send_len);
+                       // todo: it's failed to send msg, if flow control is 
enabled
+                       // the msg needs to mark unsent
+                       return NCSCC_RC_FAILURE;
+               }
        }
+       return NCSCC_RC_SUCCESS;
 }
 
 /*********************************************************
@@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void *buffer, size_t 
size,
        server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE);
        /*This can be scope-down to dest_svc_id  server_inst TBD*/
        server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE);
-
        int send_len =
            sendto(tipc_cb.BSRsock, buffer, size, 0,
                   (struct sockaddr *)&server_addr, sizeof(server_addr));
+
        if (send_len == size) {
-               m_MDS_LOG_INFO("MDTM: Successfully sent message");
+               m_MDS_LOG_INFO("MDTM: Successfully sent mcast message");
                return NCSCC_RC_SUCCESS;
        } else {
                m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err :%s",
@@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t *buffer, 
MDTM_SEND_REQ *req)
        uint8_t *ptr;
        ptr = buffer;
 
-       prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1));
+       prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1));
        enc_snd_type = (req->msg.encoding << 6);
        enc_snd_type |= (uint8_t)req->snd_type;
 
diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc
new file mode 100644
index 0000000..91b9107
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_intf.cc
@@ -0,0 +1,376 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+#include "mds/mds_tipc_fctrl_intf.h"
+
+#include <poll.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <sys/poll.h>
+#include <unistd.h>
+
+#include <map>
+#include <mutex>
+
+#include "base/ncssysf_def.h"
+#include "base/ncssysf_tsk.h"
+
+#include "mds/mds_log.h"
+#include "mds/mds_tipc_fctrl_portid.h"
+#include "mds/mds_tipc_fctrl_msg.h"
+
+using mds::Event;
+using mds::TipcPortId;
+using mds::DataMessage;
+using mds::ChunkAck;
+using mds::HeaderMessage;
+
+namespace {
+// multicast/broadcast enabled
+// todo: to be removed if flow control support it
+bool is_mcast_enabled = true;
+
+// mailbox handles for events
+SYSF_MBX mbx_events;
+
+// ncs task handle
+NCSCONTEXT p_task_hdl = nullptr;
+
+// data socket associated with port id
+int data_sock_fd = 0;
+struct tipc_portid snd_rcv_portid;
+
+// socket receiver's buffer size
+// todo: This buffer size is read from the local node as an estimated
+// buffer size of the receiver sides, in facts that could be different
+// at the receiver's sides (in the other nodes). At this moment, we
+// assume all nodes in cluster have set the same tipc buffer size
+uint64_t sock_buf_size = 0;
+
+// map of key:unique id(adest), value: TipcPortId instance
+// unique id is derived from struct tipc_portid
+std::map<uint64_t, TipcPortId*> portid_map;
+std::mutex portid_map_mutex;
+
+// chunk ack parameters
+// todo: The chunk ack size should be configurable
+uint16_t kChunkAckSize = 3;
+
+TipcPortId* portid_lookup(struct tipc_portid id) {
+  uint64_t uid = TipcPortId::GetUniqueId(id);
+  if (portid_map.find(uid) == portid_map.end()) return nullptr;
+  return portid_map[uid];
+}
+
+uint32_t process_flow_event(const Event evt) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+  TipcPortId *portid = portid_lookup(evt.id_);
+  if (portid == nullptr) {
+    if (evt.type_ == Event::Type::kEvtRcvData) {
+      portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
+          sock_buf_size);
+      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
+      if (evt.type_ == Event::Type::kEvtRcvData) {
+        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+            evt.fseq_, evt.svc_id_);
+      }
+    }
+  } else {
+    if (evt.type_ == Event::Type::kEvtRcvData) {
+      rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
+          evt.fseq_, evt.svc_id_);
+    }
+    if (evt.type_ == Event::Type::kEvtRcvChunkAck) {
+      portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_);
+    }
+    if (evt.type_ == Event::Type::kEvtSendChunkAck) {
+      portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_);
+    }
+    if (evt.type_ == Event::Type::kEvtDropData) {
+      portid->ReceiveNack(evt.mseq_, evt.mfrag_,
+          evt.fseq_);
+    }
+  }
+  return rc;
+}
+
+uint32_t process_all_events(void) {
+  enum { FD_FCTRL = 0, NUM_FDS };
+
+  int poll_tmo = MDTM_TIPC_POLL_TIMEOUT;
+  while (true) {
+    int pollres;
+    struct pollfd pfd[NUM_FDS] = {{0}};
+
+    pfd[FD_FCTRL].fd =
+        ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
+    pfd[FD_FCTRL].events = POLLIN;
+
+    pollres = poll(pfd, NUM_FDS, poll_tmo);
+
+    if (pollres == -1) {
+      if (errno == EINTR) continue;
+      m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno));
+      break;
+    }
+
+    if (pollres > 0) {
+      if (pfd[FD_FCTRL].revents == POLLIN) {
+        Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
+            &mbx_events));
+
+        if (evt == nullptr) continue;
+
+        portid_map_mutex.lock();
+        process_flow_event(*evt);
+        delete evt;
+        portid_map_mutex.unlock();
+      }
+    }
+  }  /* while */
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t create_ncs_task(void *task_hdl) {
+  int policy = SCHED_OTHER; /*root defaults */
+  int max_prio = sched_get_priority_max(policy);
+  int min_prio = sched_get_priority_min(policy);
+  int prio_val = ((max_prio - min_prio) * 0.87);
+
+  if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed");
+    return NCSCC_RC_FAILURE;
+  }
+  if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
+    return NCSCC_RC_FAILURE;
+  }
+  if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
+          "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
+          &task_hdl) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
+    return NCSCC_RC_FAILURE;
+  }
+
+  m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events");
+  return NCSCC_RC_SUCCESS;
+}
+
+}  // end local namespace
+
+uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
+    uint64_t rcv_buf_size, bool mcast_enabled) {
+  if (create_ncs_task(&p_task_hdl) !=
+      NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
+    return NCSCC_RC_FAILURE;
+  }
+  data_sock_fd = dgramsock;
+  snd_rcv_portid = id;
+  sock_buf_size = rcv_buf_size;
+  is_mcast_enabled = mcast_enabled;
+
+  m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
+      id.node, id.ref);
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_shutdown(void) {
+  if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
+    m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
+  }
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+          uint16_t* next_seq) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  portid_map_mutex.lock();
+
+  TipcPortId *portid = portid_lookup(id);
+  if (portid == nullptr) {
+    m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
+        id.node, id.ref, __LINE__);
+    rc = NCSCC_RC_FAILURE;
+  } else {
+    // assign the sequence number of the outgoing message
+    *next_seq = portid->GetCurrentSeq();
+  }
+
+  portid_map_mutex.unlock();
+
+  return rc;
+}
+
+uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
+    struct tipc_portid id) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  portid_map_mutex.lock();
+
+  TipcPortId *portid = portid_lookup(id);
+  if (portid == nullptr) {
+    m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
+        id.node, id.ref, __LINE__);
+    rc = NCSCC_RC_FAILURE;
+  } else {
+    portid->Queue(buffer, len);
+  }
+
+  portid_map_mutex.unlock();
+
+  return rc;
+}
+
+uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type) {
+  MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
+
+  portid_map_mutex.lock();
+
+  // Add this new tipc portid to the map
+  TipcPortId *portid = portid_lookup(id);
+  uint64_t uid = TipcPortId::GetUniqueId(id);
+  if (portid == nullptr) {
+    portid_map[uid] = portid = new TipcPortId(id, data_sock_fd,
+        kChunkAckSize, sock_buf_size);
+    m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u], 
svc_cnt:%u",
+        id.node, id.ref, svc_id, portid->svc_cnt_);
+  } else {
+    portid->svc_cnt_++;
+    m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u",
+        id.node, id.ref, svc_id, portid->svc_cnt_);
+  }
+
+  portid_map_mutex.unlock();
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type) {
+  MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
+
+  portid_map_mutex.lock();
+
+  // Delete this tipc portid out of the map
+  TipcPortId *portid = portid_lookup(id);
+  if (portid != nullptr) {
+    portid->svc_cnt_--;
+    m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u], 
svc_cnt:%u",
+        id.node, id.ref, svc_id, portid->svc_cnt_);
+  }
+  portid_map_mutex.unlock();
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) {
+  portid_map_mutex.lock();
+
+  // Delete this tipc portid out of the map
+  TipcPortId *portid = portid_lookup(id);
+  if (portid != nullptr) {
+    delete portid;
+    portid_map.erase(TipcPortId::GetUniqueId(id));
+    m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]", id.node, id.ref);
+  }
+
+  portid_map_mutex.unlock();
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id) {
+  HeaderMessage header;
+  header.Decode(buffer);
+  // if mds support flow control
+  if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+    if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
+      if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
+        // receive single ack message
+        ChunkAck ack;
+        ack.Decode(buffer);
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
+                header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_),
+            NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+          return NCSCC_RC_FAILURE;
+        }
+        return NCSCC_RC_SUCCESS;
+      }
+    } else {
+      // receive data message
+      DataMessage data;
+      data.Decode(buffer);
+      // send to the event thread
+      if (m_NCS_IPC_SEND(&mbx_events,
+          new Event(Event::Type::kEvtDropData, id, data.svc_id_,
+              header.mseq_, header.mfrag_, header.fseq_),
+          NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+        m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+        return NCSCC_RC_FAILURE;
+      }
+      return NCSCC_RC_SUCCESS;
+    }
+  }
+
+  return NCSCC_RC_SUCCESS;
+}
+
+uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id) {
+  HeaderMessage header;
+  header.Decode(buffer);
+  // if mds support flow control
+  if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+    if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
+      if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
+        // receive single ack message
+        ChunkAck ack;
+        ack.Decode(buffer);
+        // send to the event thread
+        if (m_NCS_IPC_SEND(&mbx_events,
+            new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
+                header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_),
+                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
+          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
+        }
+        // return NCSCC_RC_FAILURE, so the tipc receiving thread (legacy) will
+        // skip this data msg
+        return NCSCC_RC_FAILURE;
+      }
+    } else {
+      // receive data message
+      DataMessage data;
+      data.Decode(buffer);
+      // todo: skip mcast/bcast, revisit
+      if ((data.snd_type_ == MDS_SENDTYPE_BCAST ||
+          data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) {
+        return NCSCC_RC_SUCCESS;
+      }
+      portid_map_mutex.lock();
+      uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
+          id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_));
+      portid_map_mutex.unlock();
+      return rc;
+    }
+  }
+  return NCSCC_RC_SUCCESS;
+}
diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h
new file mode 100644
index 0000000..85a058f
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_intf.h
@@ -0,0 +1,47 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_
+#define MDS_MDS_TIPC_FCTRL_INTF_H_
+
+#include <linux/tipc.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id,
+    uint64_t rcv_buf_size, bool mbrcast_enabled);
+uint32_t mds_tipc_fctrl_shutdown(void);
+uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id);
+uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type);
+uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type);
+uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
+uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
+    struct tipc_portid id);
+uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len,
+    uint16_t* next_seq);
+uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
+    struct tipc_portid id);
+#ifdef __cplusplus
+}
+#endif
+
+#endif  // MDS_MDS_TIPC_FCTRL_INTF_H_
diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc
new file mode 100644
index 0000000..abd38d3
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_msg.cc
@@ -0,0 +1,142 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "mds/mds_tipc_fctrl_msg.h"
+#include "base/ncssysf_def.h"
+
+namespace mds {
+HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq,
+    uint16_t mfrag, uint16_t fseq): msg_len_(msg_len),
+        mseq_(mseq), mfrag_(mfrag), fseq_(fseq) {
+  pro_ver_ = MDS_PROT_FCTRL;
+  pro_id_ = 0;
+  msg_type_ = 0;
+}
+
+void HeaderMessage::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // encode message length
+  ptr = &msg[0];
+  ncs_encode_16bit(&ptr, msg_len_);
+  // encode sequence number
+  ptr = &msg[2];
+  ncs_encode_32bit(&ptr, mseq_);
+  // encode sequence number
+  ptr = &msg[6];
+  ncs_encode_16bit(&ptr, mfrag_);
+  // skip length_check: oct8&9
+  // encode protocol version
+  ptr = &msg[10];
+  ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
+}
+
+void HeaderMessage::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode message length
+  ptr = &msg[0];
+  msg_len_ = ncs_decode_16bit(&ptr);
+  // decode sequence number
+  ptr = &msg[2];
+  mseq_ = ncs_decode_32bit(&ptr);
+  // decode fragment number
+  ptr = &msg[6];
+  mfrag_ = ncs_decode_16bit(&ptr);
+  // decode protocol version
+  ptr = &msg[10];
+  pro_ver_ = ncs_decode_8bit(&ptr);
+  if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
+    // decode flow control sequence number
+    ptr = &msg[8];
+    fseq_ = ncs_decode_16bit(&ptr);
+    // decode protocol identifier
+    ptr = &msg[11];
+    pro_id_ = ncs_decode_32bit(&ptr);
+    if (pro_id_ == MDS_PROT_FCTRL_ID) {
+      // decode message type
+      ptr = &msg[15];
+      msg_type_ = ncs_decode_8bit(&ptr);
+    }
+  } else {
+    if (mfrag_ != 0) {
+      ptr = &msg[8];
+      fseq_ = ncs_decode_16bit(&ptr);
+      if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
+    }
+  }
+}
+
+void DataMessage::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode service id
+  ptr = &msg[MDTM_PKT_TYPE_OFFSET +
+             MDTM_FRAG_HDR_LEN +
+             MDS_HEADER_RCVR_SVC_ID_POSITION];
+  svc_id_ = ncs_decode_16bit(&ptr);
+  // decode snd_type
+  ptr = &msg[17];
+  snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
+}
+
+DataMessage::~DataMessage() {
+  if (msg_data_ != nullptr) {
+    delete msg_data_;
+    msg_data_ = nullptr;
+  }
+}
+
+ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size):
+    svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) {
+  msg_type_ = kChunkAckMsgType;
+}
+
+void ChunkAck::Encode(uint8_t *msg) {
+  uint8_t *ptr;
+  // encode protocol identifier
+  ptr = &msg[11];
+  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
+  // encode message type
+  ptr = &msg[15];
+  ncs_encode_8bit(&ptr, kChunkAckMsgType);
+  // encode service id
+  ptr = &msg[16];
+  ncs_encode_16bit(&ptr, svc_id_);
+  // encode flow control sequence number
+  ptr = &msg[18];
+  ncs_encode_16bit(&ptr, acked_fseq_);
+  // encode chunk size
+  ptr = &msg[20];
+  ncs_encode_16bit(&ptr, chunk_size_);
+}
+
+void ChunkAck::Decode(uint8_t *msg) {
+  uint8_t *ptr;
+
+  // decode service id
+  ptr = &msg[16];
+  svc_id_ = ncs_decode_16bit(&ptr);
+  // decode flow control sequence number
+  ptr = &msg[18];
+  acked_fseq_ = ncs_decode_16bit(&ptr);
+  // decode chunk size
+  ptr = &msg[20];
+  chunk_size_ = ncs_decode_16bit(&ptr);
+}
+
+}  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h
new file mode 100644
index 0000000..677f256
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_msg.h
@@ -0,0 +1,129 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_
+#define MDS_MDS_TIPC_FCTRL_MSG_H_
+
+#include <linux/tipc.h>
+#include "base/ncssysf_ipc.h"
+#include "base/ncssysf_tmr.h"
+#include "mds/mds_dt.h"
+
+namespace mds {
+
+class Event {
+ public:
+  enum class Type {
+    kEvtPortIdAll = 0,
+    kEvtPortIdUp,     // event of new portid published (not used)
+    kEvtPortIdDown,   // event of portid widthdrawn (not used)
+
+    kEvtDataFlowAll,
+    kEvtRcvData,       // event that received data msg from peer
+    kEvtSendChunkAck,  // event to send the ack for a chunk of N accumulative
+                       // data msgs (N>=1)
+    kEvtRcvChunkAck,   // event that received the ack for a chunk of N
+                       // accumulative data msgs (N>=1)
+    kEvtSendSelectiveAck,  // event to send the ack for a number of selective
+                           // data msgs (not supported)
+    kEvtRcvSelectiveAck,   // event that received the ack for a numer of
+                           // selective data msgs (not supported)
+    kEvtDropData,          // event reported from tipc that a message is not
+                           // delivered
+  };
+  NCS_IPC_MSG next_{0};
+  Type type_;
+
+  // Used for flow event only
+  struct tipc_portid id_;
+  uint16_t svc_id_{0};
+  uint32_t mseq_{0};
+  uint16_t mfrag_{0};
+  uint16_t fseq_{0};
+  uint16_t chunk_size_{1};
+  explicit Event(Type type):type_(type) {}
+  Event(Type type, struct tipc_portid id, uint16_t svc_id,
+      uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num):
+    id_(id), svc_id_(svc_id),
+    mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) {
+    type_ = type;
+  }
+  Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq,
+      uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size):
+    id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag),
+    fseq_(f_seg_num), chunk_size_(chunk_size) {
+    type_ = type;
+  }
+};
+
+class BaseMessage {
+ public:
+  virtual ~BaseMessage() {}
+  virtual void Decode(uint8_t *msg) {}
+  virtual void Encode(uint8_t *msg) {}
+};
+
+class HeaderMessage: public BaseMessage {
+ public:
+  uint8_t* msg_ptr_{nullptr};
+  uint16_t msg_len_{0};
+  uint32_t mseq_{0};
+  uint16_t mfrag_{0};
+  uint16_t fseq_{0};
+  uint8_t pro_ver_{0};
+  uint32_t pro_id_{0};
+  uint16_t msg_type_{0};
+  HeaderMessage() {}
+  HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag,
+      uint16_t fseq);
+  virtual ~HeaderMessage() {}
+  void Decode(uint8_t *msg) override;
+  void Encode(uint8_t *msg) override;
+};
+
+class DataMessage: public BaseMessage {
+ public:
+  HeaderMessage header_;
+  uint16_t svc_id_{0};
+
+  uint8_t* msg_data_{nullptr};
+  uint8_t snd_type_{0};
+
+  DataMessage() {}
+  virtual ~DataMessage();
+  void Decode(uint8_t *msg) override;
+};
+
+class ChunkAck: public BaseMessage {
+ public:
+  static const uint8_t kChunkAckMsgType = 1;
+  static const uint16_t kChunkAckMsgLength = 22;
+
+  uint8_t msg_type_{0};
+  uint16_t svc_id_{0};
+  uint16_t acked_fseq_{0};
+  uint16_t chunk_size_{1};
+  ChunkAck() {}
+  ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size);
+  virtual ~ChunkAck() {}
+  void Encode(uint8_t *msg) override;
+  void Decode(uint8_t *msg) override;
+};
+
+}  // end namespace mds
+
+#endif  // MDS_MDS_TIPC_FCTRL_MSG_H_
diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc
new file mode 100644
index 0000000..24d13ee
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_portid.cc
@@ -0,0 +1,261 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#include "mds/mds_tipc_fctrl_portid.h"
+#include "base/ncssysf_def.h"
+
+#include "mds/mds_dt.h"
+#include "mds/mds_log.h"
+
+namespace mds {
+
+void MessageQueue::Queue(DataMessage* msg) {
+  queue_.push_back(msg);
+}
+
+DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
+  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
+    DataMessage *m = *it;
+    if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
+      return m;
+    }
+  }
+  return nullptr;
+}
+
+uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
+  uint64_t msg_len = 0;
+  for (auto it = queue_.begin(); it != queue_.end();) {
+    DataMessage *m = *it;
+    if (fseq_from <= m->header_.fseq_ &&
+        m->header_.fseq_ <= fseq_to) {
+      msg_len += m->header_.msg_len_;
+      it = queue_.erase(it);
+      delete m;
+    } else {
+      it++;
+    }
+  }
+  return msg_len;
+}
+
+void MessageQueue::Clear() {
+  while (queue_.empty() == false) {
+    DataMessage* msg = queue_.front();
+    queue_.pop_front();
+    delete msg;
+  }
+}
+
+TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize,
+    uint64_t sock_buf_size):
+  id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) {
+}
+
+TipcPortId::~TipcPortId() {
+  // clear all msg in sndqueue_
+  sndqueue_.Clear();
+}
+
+uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
+  // this uid is equivalent to the mds adest
+  uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref;
+  return uid;
+}
+
+uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
+  struct sockaddr_tipc server_addr;
+  ssize_t send_len = 0;
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  memset(&server_addr, 0, sizeof(server_addr));
+  server_addr.family = AF_TIPC;
+  server_addr.addrtype = TIPC_ADDR_ID;
+  server_addr.addr.id = id_;
+  send_len = sendto(bsrsock_, data, length, 0,
+        (struct sockaddr *)&server_addr, sizeof(server_addr));
+
+  if (send_len == length) {
+    rc = NCSCC_RC_SUCCESS;
+  } else {
+    m_MDS_LOG_ERR("sendto() err :%s", strerror(errno));
+    rc = NCSCC_RC_FAILURE;
+  }
+  return rc;
+}
+
+uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+
+  DataMessage *msg = new DataMessage;
+  msg->header_.Decode(const_cast<uint8_t*>(data));
+  msg->Decode(const_cast<uint8_t*>(data));
+  msg->msg_data_ = new uint8_t[length];
+  memcpy(msg->msg_data_, data, length);
+  sndqueue_.Queue(msg);
+  ++sndwnd_.send_;
+  sndwnd_.nacked_space_ += length;
+  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
+      "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+      id_.node, id_.ref,
+      msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length,
+      sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+
+  return rc;
+}
+
+bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
+  return true;
+}
+
+void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
+    uint16_t chksize) {
+  uint8_t data[ChunkAck::kChunkAckMsgLength];
+
+  HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq);
+  header.Encode(reinterpret_cast<uint8_t*>(&data));
+
+  ChunkAck sack(svc_id, fseq, chksize);
+  sack.Encode(reinterpret_cast<uint8_t*>(&data));
+  Send(reinterpret_cast<uint8_t*>(&data), ChunkAck::kChunkAckMsgLength);
+  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+      "SndChkAck[fseq:%u, chunk:%u]",
+      id_.node, id_.ref,
+      fseq, chksize);
+}
+
+uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
+    uint16_t fseq, uint16_t svc_id) {
+  uint32_t rc = NCSCC_RC_SUCCESS;
+  // update receiver sequence window
+  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+        "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        mseq, mfrag, fseq,
+        rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+
+    ++rcvwnd_.rcv_;
+    if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
+      // send ack for @chunk_size_ msgs starting from fseq
+      SendChunkAck(fseq, svc_id, chunk_size_);
+      rcvwnd_.acked_ = rcvwnd_.rcv_;
+    }
+  } else {
+    // todo: update rcvwnd_.nacked_space_.
+    // This nacked_space_ will tell the number of bytes that has not been acked
+    // to the sender. If this nacked_space_ is growing large, and approaching
+    // the socket buffer size, the transmission of sender may be queued.
+    // this nacked_space_ can be used to detect if the kChunkAckTimeout or
+    // kChunkAckSize are set excessively large.
+    // It is not used for now, so ignore it.
+
+    // check for transmission error
+    if (rcvwnd_.rcv_ + 1 < fseq) {
+      if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
+        // peer does not realize that this portid reset
+        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+            "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+            "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+            "Warning[portid reset]",
+            id_.node, id_.ref,
+            mseq, mfrag, fseq,
+            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+
+        rcvwnd_.rcv_ = fseq;
+      } else {
+        rc = NCSCC_RC_FAILURE;
+        // msg loss
+        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+            "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+            "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+            "Error[msg loss]",
+            id_.node, id_.ref,
+            mseq, mfrag, fseq,
+            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+      }
+    }
+    if (fseq <= rcvwnd_.acked_) {
+      rc = NCSCC_RC_FAILURE;
+      // unexpected retransmission
+      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+          "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
+          "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
+          "Error[unexpected retransmission]",
+          id_.node, id_.ref,
+          mseq, mfrag, fseq,
+          rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
+    }
+  }
+  return rc;
+}
+
+void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
+  // update sender sequence window
+  if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
+    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvChkAck[fseq:%u, chunk:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+        "queue[size:%" PRIu64 "]",
+        id_.node, id_.ref,
+        fseq, chksize,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndqueue_.Size());
+
+    // fast forward the sndwnd_.acked_ sequence to fseq
+    sndwnd_.acked_ = fseq;
+
+    // remove a number @chksize messages out of sndqueue_ and decrease
+    // the nacked_space_ of sender
+    sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq);
+  } else {
+    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
+        "RcvChkAck[fseq:%u, chunk:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
+        "queue[size:%" PRIu64 "], "
+        "Error[msg disordered]",
+        id_.node, id_.ref,
+        fseq, chksize,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
+        sndqueue_.Size());
+  }
+}
+
+void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
+    uint16_t fseq) {
+  DataMessage* msg = sndqueue_.Find(mseq, mfrag);
+  if (msg != nullptr) {
+    // Resend the msg found
+    Send(msg->msg_data_, msg->header_.msg_len_);
+    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
+        "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
+        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
+        id_.node, id_.ref,
+        mseq, mfrag, fseq,
+        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
+  } else {
+    m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
+        "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
+        "Error[msg not found]",
+        id_.node, id_.ref,
+        mseq, mfrag, fseq);
+  }
+}
+
+}  // end namespace mds
diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h
new file mode 100644
index 0000000..8068e6e
--- /dev/null
+++ b/src/mds/mds_tipc_fctrl_portid.h
@@ -0,0 +1,87 @@
+/*      -*- OpenSAF  -*-
+ *
+ * (C) Copyright 2019 The OpenSAF Foundation
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed
+ * under the GNU Lesser General Public License Version 2.1, February 1999.
+ * The complete license can be accessed from the following location:
+ * http://opensource.org/licenses/lgpl-license.php
+ * See the Copying file included with the OpenSAF distribution for full
+ * licensing terms.
+ *
+ * Author(s): Ericsson AB
+ *
+ */
+
+#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_
+#define MDS_MDS_TIPC_FCTRL_PORTID_H_
+
+#include <linux/tipc.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <deque>
+#include "mds/mds_tipc_fctrl_msg.h"
+
+namespace mds {
+
+class MessageQueue {
+ public:
+  void Queue(DataMessage* msg);
+  DataMessage* Find(uint32_t mseq, uint16_t mfrag);
+  uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
+  uint64_t Size() const { return queue_.size(); }
+  void Clear();
+ private:
+  std::deque<DataMessage*> queue_;
+};
+
+class TipcPortId {
+ public:
+  TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
+      uint64_t sock_buf_size);
+  ~TipcPortId();
+  static uint64_t GetUniqueId(struct tipc_portid id);
+  int GetSock() const { return bsrsock_; }
+  uint16_t GetCurrentSeq() { return sndwnd_.send_; }
+  bool ReceiveCapable(uint16_t sending_len);
+  void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
+  void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size);
+  uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
+      uint16_t fseq, uint16_t svc_id);
+  void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
+  uint32_t Send(uint8_t* data, uint16_t length);
+  uint32_t Queue(const uint8_t* data, uint16_t length);
+
+  uint16_t svc_cnt_{1};  // number of service subscribed on this portid
+
+ private:
+  struct tipc_portid id_;
+  int bsrsock_;  // tipc socket to send/receive data per tipc_portid
+  uint16_t chunk_size_{5};
+  uint64_t rcv_buf_size_{0};  // estimated buffer size at receiver
+
+  struct sndwnd {
+    // sender sequence window
+    uint16_t acked_{0};  // last sequence has been acked by receiver
+    uint16_t send_{1};   // next sequence to be sent
+    uint64_t nacked_space_{0};  // total bytes are sent but not acked
+  };
+  struct sndwnd sndwnd_;
+
+  struct rcvwnd {
+    // receiver sequence window
+    uint16_t acked_{0};  // last sequence has been acked to sender
+    uint16_t rcv_{0};    // last sequence has been received
+    uint64_t nacked_space_{0};  // total bytes has not been acked
+  };
+  struct rcvwnd rcvwnd_;
+
+  MessageQueue sndqueue_;
+};
+
+}  // end namespace mds
+#endif  // MDS_MDS_TIPC_FCTRL_PORTID_H_
-- 
2.7.4



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to