Hi Minh,

see one comment below. /Thanks Hans

On 2019-08-23 03:48, Minh Hon Chau wrote:
> Hi Hans,
>
> Thanks for your time to review the patch, please see my replies below 
> your comments.
>
> Regards,
>
> Minh
>
> On 22/8/19 11:07 pm, Hans Nordebäck wrote:
>> Hi Minh,
>>
>> it is a large patch so i have to review parts of it, below are my
>> comments, marked with [HansN], for files:
>>
>> src/mds/Makefile.am
>> src/mds/mds_dt.h
>> src/mds/mds_dt_tipc.c
>>
>> I'll continue with the rest of the files a bit later. /Thanks Hans
>>
>> On 2019-08-14 08:38, Minh Chau wrote:
>>> This is a collaborative patch of two participants:Thuan, Minh.
>>>
>>> Main changes:
>>> - Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files
>>> introduce new functions which are called in mds_dt_tipc.c if the flow
>>> control is enabled
>>> - Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files
>>> implements the tipc portid instance, which supports the sliding window,
>>> mds msg queue
>>> - Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define
>>> the event and messages which are used for this solution.
>>> ---
>>>    src/mds/Makefile.am              |  10 +-
>>>    src/mds/mds_dt.h                 |   8 +-
>>>    src/mds/mds_dt_tipc.c            | 188 +++++++++++++-------
>>>    src/mds/mds_tipc_fctrl_intf.cc   | 376 
>>> +++++++++++++++++++++++++++++++++++++++
>>>    src/mds/mds_tipc_fctrl_intf.h    |  47 +++++
>>>    src/mds/mds_tipc_fctrl_msg.cc    | 142 +++++++++++++++
>>>    src/mds/mds_tipc_fctrl_msg.h     | 129 ++++++++++++++
>>>    src/mds/mds_tipc_fctrl_portid.cc | 261 +++++++++++++++++++++++++++
>>>    src/mds/mds_tipc_fctrl_portid.h  |  87 +++++++++
>>>    9 files changed, 1184 insertions(+), 64 deletions(-)
>>>    create mode 100644 src/mds/mds_tipc_fctrl_intf.cc
>>>    create mode 100644 src/mds/mds_tipc_fctrl_intf.h
>>>    create mode 100644 src/mds/mds_tipc_fctrl_msg.cc
>>>    create mode 100644 src/mds/mds_tipc_fctrl_msg.h
>>>    create mode 100644 src/mds/mds_tipc_fctrl_portid.cc
>>>    create mode 100644 src/mds/mds_tipc_fctrl_portid.h
>>>
>>> diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am
>>> index 2d7b652..d849e8f 100644
>>> --- a/src/mds/Makefile.am
>>> +++ b/src/mds/Makefile.am
>>> @@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \
>>>    if ENABLE_TIPC_TRANSPORT
>>>    noinst_HEADERS += src/mds/mds_dt_tipc.h \
>>>        src/mds/mds_tipc_recvq_stats.h \
>>> -    src/mds/mds_tipc_recvq_stats_impl.h
>>> +    src/mds/mds_tipc_recvq_stats_impl.h \
>>> +    src/mds/mds_tipc_fctrl_intf.h \
>>> +    src/mds/mds_tipc_fctrl_portid.h \
>>> +    src/mds/mds_tipc_fctrl_msg.h
>>>    lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \
>>>        src/mds/mds_tipc_recvq_stats.cc \
>>> -    src/mds/mds_tipc_recvq_stats_impl.cc
>>> +    src/mds/mds_tipc_recvq_stats_impl.cc \
>>> +    src/mds/mds_tipc_fctrl_intf.cc \
>>> +    src/mds/mds_tipc_fctrl_portid.cc \
>>> +    src/mds/mds_tipc_fctrl_msg.cc
>>>    endif
>>>       if ENABLE_TESTS
>>> diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h
>>> index b645bb4..d9e8633 100644
>>> --- a/src/mds/mds_dt.h
>>> +++ b/src/mds/mds_dt.h
>>> @@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL 
>>> ref);
>>>    uint32_t mds_tmr_mailbox_processing(void);
>>>    uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL 
>>> *svc_hdl);
>>>    uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, 
>>> uint32_t seq_num,
>>> -                           uint16_t frag_byte);
>>> +                           uint16_t frag_byte, uint16_t 
>>> fctrl_seq_num);
>>>    uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg);
>>>    uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, 
>>> uint64_t tipc_id,
>>>                                    uint32_t *buff_dump);
>>> @@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, 
>>> NCSCONTEXT msg);
>>>       #define MDS_PROT 0xA0
>>>    #define MDS_VERSION 0x08
>>> -#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION)
>>> +#define MDS_PROT_VER_MASK 0xFC
>>>    #define MDTM_PRI_MASK 0x3
>>>    +/* MDS protocol/version for flow control */
>>> +#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION)
>>> +#define MDS_PROT_FCTRL_ID 0x00AC13F5
>>> +
>>>    /* Added for the subscription changes */
>>>    #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff0000)
>>>    #define MDS_TIPC_COMMON_ID 0x01001000
>>> diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c
>>> index 86b52bb..fef1c50 100644
>>> --- a/src/mds/mds_dt_tipc.c
>>> +++ b/src/mds/mds_dt_tipc.c
>>> @@ -47,6 +47,7 @@
>>>    #include "mds_dt_tipc.h"
>>>    #include "mds_dt_tcp_disc.h"
>>>    #include "mds_core.h"
>>> +#include "mds_tipc_fctrl_intf.h"
>>>    #include "mds_tipc_recvq_stats.h"
>>>    #include "base/osaf_utility.h"
>>>    #include "base/osaf_poll.h"
>>> @@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list;
>>>    uint32_t mdtm_global_frag_num;
>>>       const unsigned int MAX_RECV_THRESHOLD = 30;
>>> +uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION;
>>>    -static bool get_tipc_port_id(int sock, uint32_t* port_id) {
>>> +static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) {
>>>        struct sockaddr_tipc addr;
>>>        socklen_t sz = sizeof(addr);
>>>           memset(&addr, 0, sizeof(addr));
>>> -    *port_id = 0;
>>> +    port_id->node = 0;
>>> +    port_id->ref = 0;
>>>        if (0 > getsockname(sock, (struct sockaddr *)&addr, &sz)) {
>>>            syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, 
>>> err: %s",
>>>                   strerror(errno));
>>>            return false;
>>>        }
>>>    -    *port_id = addr.addr.id.ref;
>>> +    *port_id = addr.addr.id;
>>>        return true;
>>>    }
>>>    @@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, 
>>> uint32_t *mds_tipc_ref)
>>>        }
>>>           /* Code for getting the self tipc random number */
>>> -    if (!get_tipc_port_id(tipc_cb.BSRsock, mds_tipc_ref)) {
>>> +    struct tipc_portid port_id;
>> [HansN] mds_tipc_ref was previously set to 0 in get_tipc_port_id , this
>> is now missing.
> [Minh] It's now the port_id.ref set to 0 in new get_tipc_port_id(), 
> then set it back to mds_tipc_ref.
[HansN] if get_tipc_port_id returns false mds_tipc_ref is not set at 
return NCSCC_RETURN_FAILURE
>>> +    if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) {
>>>            close(tipc_cb.Dsock);
>>>            close(tipc_cb.BSRsock);
>>>            return NCSCC_RC_FAILURE;
>>>        }
>>> -
>>> +    *mds_tipc_ref = port_id.ref;
>>>        tipc_cb.adest = ((uint64_t)(nodeid)) << 32;
>>>        tipc_cb.adest |= *mds_tipc_ref;
>>>        tipc_cb.node_id = nodeid;
>>> @@ -325,6 +329,23 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, 
>>> uint32_t *mds_tipc_ref)
>>>            mdtm_set_transport(MDTM_TX_TYPE_TIPC);
>>>        }
>>>    +    /* Get tipc socket receive buffer size */
>> [HansN] int optval = 0;
>>> +    int optval;
>>> +    socklen_t optlen = sizeof(optval);
>>> +    if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF,
>>> +        &optval, &optlen) != 0) {
>>> +        syslog(LOG_ERR, "MDTM: getsockopt() failed to get rcv buf 
>>> size to: %str",
>>> +            strerror(errno));
>>> +        close(tipc_cb.Dsock);
>>> +        close(tipc_cb.BSRsock);
>>> +        return NCSCC_RC_FAILURE;
>>> +    }
>>> +
>>> +    /* Create flow control tasks if enabled*/
>>> +    gl_mds_pro_ver = MDS_PROT_FCTRL;
>> [HansN] a question, optval is int (receive buffer size) and casted to
>> uint64_t (the cast is not needed) in mds_tipc_fctrl_initialize, why use
>> different types, int and uint64_t?
> [Minh] a mistake, int should be fine.
>>> + mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id,
>>> +        (uint64_t)optval, tipc_mcast_enabled);
>>> +
>>>        /* Create a task to receive the events and data */
>>>        if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != 
>>> NCSCC_RC_SUCCESS) {
>>>            syslog(LOG_ERR,
>>> @@ -469,7 +490,7 @@ uint32_t mdtm_tipc_destroy(void)
>>>                 NULL);
>>>        m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx,
>>>                  (NCS_IPC_CB)mdtm_mailbox_mbx_cleanup);
>>> -
>>> +    mds_tipc_fctrl_shutdown();
>>>        /* Clear reference hdl list */
>>>        while (mdtm_ref_hdl_list_hdr != NULL) {
>>>            /* Store temporary the pointer of entry to be deleted */
>>> @@ -672,36 +693,26 @@ ssize_t recvfrom_connectionless(int sd, void 
>>> *buf, size_t nbytes, int flags,
>>>                                       0));
>>>                        if (anc_data[0] == TIPC_ERR_OVERLOAD) {
>>>                            LOG_ER(
>>> -                            "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> 
>>> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD",
>>> -                            ((struct sockaddr_tipc*) 
>>> from)->addr.id.node,
>>> -                            ((struct sockaddr_tipc*) 
>>> from)->addr.id.ref);
>>> -                        m_MDS_LOG_CRITICAL(
>>>                                "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> 
>>> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD ",
>>>                                ((struct sockaddr_tipc*) 
>>> from)->addr.id.node,
>>>                                ((struct sockaddr_tipc*) 
>>> from)->addr.id.ref);
>>> -                    } else {
>>> +                    } else if (anc_data[0] == TIPC_ERR_NO_PORT){
>>>                            /* TIPC_ERRINFO - TIPC error
>>>                             * code associated with a
>>>                             * returned data message or a
>>>                             * connection termination
>>>                             * message */
>>> -                        m_MDS_LOG_DBG(
>>> -                            "MDTM: undelivered message condition 
>>> ancillary data: TIPC_ERRINFO err : %d",
>>> -                            anc_data[0]);
>>> + mds_tipc_fctrl_portid_terminate(((struct 
>>> sockaddr_tipc*)from)->addr.id);
>>> +                    } else {
>>> +                        m_MDS_LOG_ERR("MDTM:: TIPC_ERRINFO 
>>> anc_data[0]:%u", anc_data[0]);
>>>                        }
>>>                    } else if (anc->cmsg_type == TIPC_RETDATA) {
>>> -                    /* If we set TIPC_DEST_DROPPABLE off
>>> -                       message (configure TIPC to return
>>> -                       rejected messages to the sender ) we
>>> -                       will hit this when we implement MDS
>>> -                       retransmit lost messages, can be
>>> -                       replaced with flow control logic */
>>>                        /* TIPC_RETDATA -The contents of a
>>>                         * returned data message */
>>> -                    LOG_IN(
>>> -                        "MDTM: undelivered message condition 
>>> ancillary data: TIPC_RETDATA");
>>> -                    m_MDS_LOG_INFO(
>>> -                        "MDTM: undelivered message condition 
>>> ancillary data: TIPC_RETDATA");
>>> +                    LOG_IN("MDTM: undelivered message condition 
>>> ancillary data: TIPC_RETDATA");
>>> +                    uint16_t ret_msg_len = anc->cmsg_len - 
>>> sizeof(*anc);
>>> +                    unsigned char *ret_msg = CMSG_DATA(anc);
>>> +                    mds_tipc_fctrl_drop_data(ret_msg, ret_msg_len, 
>>> ((struct sockaddr_tipc*)from)->addr.id);
>>>                    } else if (anc->cmsg_type == TIPC_DESTNAME) {
>>>                        if (sz == 0) {
>>>                            m_MDS_LOG_DBG(
>>> @@ -822,7 +833,7 @@ static uint32_t mdtm_process_recv_events(void)
>>>                                    m_MDS_LOG_INFO(
>>>                                        "MDTM: Published: ");
>>>                                    m_MDS_LOG_INFO(
>>> -                                    "MDTM:  <%u,%u,%u> port id 
>>> <0x%08x:%u>\n",
>>> +                                    "MDTM:  <%x,%x,%x> port id 
>>> <0x%08x:%u>\n",
>>>                                        NTOHL(
>>>                                        event.s
>>>                                            .seq
>>> @@ -841,7 +852,12 @@ static uint32_t mdtm_process_recv_events(void)
>>>                                        event
>>>                                            .port
>>>                                            .ref));
>>> -
>>> +                                struct tipc_portid id = {
>>> +                                    .node = NTOHL(event.port.node),
>>> +                                    .ref = NTOHL(event.port.ref)
>>> +                                };
>>> +                                uint32_t type = 
>>> NTOHL(event.s.seq.type);
>>> +                                mds_tipc_fctrl_portid_up(id, type);
>>>                                    if (NCSCC_RC_SUCCESS !=
>>> mdtm_process_discovery_events(
>>>                                        TIPC_PUBLISHED,
>>> @@ -873,7 +889,12 @@ static uint32_t mdtm_process_recv_events(void)
>>>                                        event
>>>                                            .port
>>>                                            .ref));
>>> -
>>> +                                struct tipc_portid id = {
>>> +                                    .node = NTOHL(event.port.node),
>>> +                                    .ref = NTOHL(event.port.ref)
>>> +                                };
>>> +                                uint32_t type = 
>>> NTOHL(event.s.seq.type);
>>> + mds_tipc_fctrl_portid_down(id, type);
>>>                                    if (NCSCC_RC_SUCCESS !=
>>> mdtm_process_discovery_events(
>>>                                        TIPC_WITHDRAWN,
>>> @@ -986,8 +1007,6 @@ static uint32_t mdtm_process_recv_events(void)
>>>                            break;
>>>                        }
>>>                        if (recd_bytes == 0) {
>>> -                        m_MDS_LOG_DBG(
>>> -                            "MDTM: recd bytes=0 on received on 
>>> sock, abnormal/unknown/hack  condition. Ignoring");
>>>                            break;
>>>                        }
>>>                        data = inbuf;
>>> @@ -1076,9 +1095,10 @@ static uint32_t mdtm_process_recv_events(void)
>>>                                    &buff_dump);
>> [HansN] flow control seems not supported if MDS_CHECKSUM_ENABLE_FLAG is
>> set? Either we document that flow control is not supported if
>> MDS_CHECKSUM_ENABLE_FLAG is set
>>
>> or we support it?
> [Minh] At this moment we do not support MDS_CHECKSUM_ENABLE_FLAG. By 
> far there seem to be not any applications set it on, but we can 
> support it in future by stepping up the MDS version. This will be 
> documented.
>>
>>>                            }
>>>    #else
>>> -                        mdtm_process_recv_data(
>>> -                            &inbuf[2], recd_bytes - 2,
>>> -                            tipc_id, &buff_dump);
>>> +                        if (mds_tipc_fctrl_rcv_data(inbuf, 
>>> recd_bytes, client_addr.addr.id)
>>> +                            == NCSCC_RC_SUCCESS) {
>>> + mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, 
>>> &buff_dump);
>>> +                        }
>>>    #endif
>>>                        } else {
>>>                            uint64_t tipc_id;
>>> @@ -1873,9 +1893,9 @@ uint32_t mds_mdtm_svc_install_tipc(PW_ENV_ID 
>>> pwe_id, MDS_SVC_ID svc_id,
>>>        server_addr.addr.nameseq.upper = server_inst;
>>>           /* The self tipc random port number */
>>> -    uint32_t port_id = 0;
>>> +  struct tipc_portid port_id;
>>>        get_tipc_port_id(tipc_cb.BSRsock, &port_id);
>>> -    m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id,
>>> +    m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", 
>>> port_id.ref,
>>>                 server_type, server_inst);
>>>           if (0 != bind(tipc_cb.BSRsock, (struct sockaddr 
>>> *)&server_addr,
>>> @@ -2495,6 +2515,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
>>>         */
>>>        uint32_t status = 0;
>>>        uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
>> [HansN] indentation is not correct
> [Minh] Will fix.
>>> +  uint16_t fctrl_seq_num = 0;
>>>        int version = req->msg_arch_word & 0x7;
>>>        if (version > 1) {
>>>            sum_mds_hdr_plus_mdtm_hdr_plus_len =
>>> @@ -2583,11 +2604,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
>>>                    mdtm_add_mds_hdr(buffer_ack, req)) {
>>>                    return NCSCC_RC_FAILURE;
>>>                }
>>> -
>>> +          /* if sndqueue is capable, then obtain the current 
>>> sending seq */
>>> +          if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, 
>>> &fctrl_seq_num)
>>> +              == NCSCC_RC_FAILURE){
>>> +            m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", 
>>> len);
>>> +            return NCSCC_RC_FAILURE;
>>> +          }
>>>                /* Add frag_hdr */
>>>                if (NCSCC_RC_SUCCESS !=
>>>                    mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num,
>>> -                          0)) {
>>> +                          0, fctrl_seq_num)) {
>>>                    return NCSCC_RC_FAILURE;
>>>                }
>>>    @@ -2676,13 +2702,22 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ 
>>> *req)
>>>                        free(body);
>>>                        return NCSCC_RC_FAILURE;
>>>                    }
>>> +                /* if sndqueue is capable, then obtain the current 
>>> sending seq */
>>> +                if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
>>> +                    len + sum_mds_hdr_plus_mdtm_hdr_plus_len,
>>> +                    &fctrl_seq_num) == NCSCC_RC_FAILURE){
>>> +                    m_MDS_LOG_ERR("FCTRL: Failed to send message 
>>> len :%d",
>>> +                        len + sum_mds_hdr_plus_mdtm_hdr_plus_len);
>>> +                    free(body);
>>> +                    return NCSCC_RC_FAILURE;
>>> +                }
>>>                       if (NCSCC_RC_SUCCESS !=
>>>                        mdtm_add_frag_hdr(
>>>                        body,
>>>                        (len +
>>>                         sum_mds_hdr_plus_mdtm_hdr_plus_len),
>>> -                    frag_seq_num, 0)) {
>>> +                    frag_seq_num, 0, fctrl_seq_num)) {
>>>                        m_MDS_LOG_ERR(
>>>                            "MDTM: Unable to add the frag Hdr to the 
>>> send msg\n");
>>>                        m_MMGR_FREE_BUFR_LIST(usrbuf);
>>> @@ -2778,12 +2813,23 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req)
>>>                        req->msg.data.buff_info.buff);
>>>                    return NCSCC_RC_FAILURE;
>>>                }
>>> +            /* if sndqueue is capable, then obtain the current 
>>> sending seq */
>>> +            if (mds_tipc_fctrl_sndqueue_capable(tipc_id,
>>> +                req->msg.data.buff_info.len + 
>>> sum_mds_hdr_plus_mdtm_hdr_plus_len,
>>> +                &fctrl_seq_num) == NCSCC_RC_FAILURE) {
>>> +                m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d",
>>> +                    req->msg.data.buff_info.len + 
>>> sum_mds_hdr_plus_mdtm_hdr_plus_len);
>>> +                free(body);
>>> + mds_free_direct_buff(req->msg.data.buff_info.buff);
>>> +                return NCSCC_RC_FAILURE;
>>> +            }
>>> +
>>>                if (NCSCC_RC_SUCCESS !=
>>>                    mdtm_add_frag_hdr(
>>>                    body,
>>>                    req->msg.data.buff_info.len +
>>>                        sum_mds_hdr_plus_mdtm_hdr_plus_len,
>>> -                frag_seq_num, 0)) {
>>> +                frag_seq_num, 0, fctrl_seq_num)) {
>>>                    m_MDS_LOG_ERR(
>>>                        "MDTM: Unable to add the frag Hdr to the send 
>>> msg\n");
>>>                    free(body);
>>> @@ -2860,6 +2906,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ 
>>> *req, uint32_t seq_num,
>>>        uint16_t i = 1;
>>>        uint16_t frag_val = 0;
>>>        uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len;
>>> +    uint16_t fctrl_seq_num = 0;
>>>        int version = req->msg_arch_word & 0x7;
>>>        uint32_t ret = NCSCC_RC_SUCCESS;
>>>    @@ -2938,9 +2985,18 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ 
>>> *req, uint32_t seq_num,
>>>                        return NCSCC_RC_FAILURE;
>>>                    }
>>>                }
>>> +            /* if sndqueue is capable, then obtain the current 
>>> sending seq */
>>> +            if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, 
>>> &fctrl_seq_num)
>>> +                == NCSCC_RC_FAILURE) {
>>> +                m_MDS_LOG_ERR("FCTRL: Failed to send message len 
>>> :%d", len_buf);
>>> +                m_MMGR_FREE_BUFR_LIST(usrbuf);
>>> +                free(body);
>>> +                return NCSCC_RC_FAILURE;
>>> +            }
>>> +
>>>                if (NCSCC_RC_SUCCESS !=
>>>                    mdtm_add_frag_hdr(body, len_buf, seq_num,
>>> -                          frag_val)) {
>>> +                          frag_val, fctrl_seq_num)) {
>>>                    m_MDS_LOG_ERR(
>>>                        "MDTM: Frag hde addition failed\n");
>>>                    m_MMGR_FREE_BUFR_LIST(usrbuf);
>>> @@ -2996,7 +3052,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ 
>>> *req, uint32_t seq_num,
>>>    *********************************************************/
>>>       uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, 
>>> uint32_t seq_num,
>>> -               uint16_t frag_byte)
>>> +               uint16_t frag_byte, uint16_t fctrl_seq_num)
>>>    {
>>>        /* Add the FRAG HDR to the Buffer */
>>>        uint8_t *data;
>>> @@ -3013,9 +3069,17 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, 
>>> uint16_t len, uint32_t seq_num,
>>>                        5); /* 2 bytes for keeping len, to cross
>>>                           check at the receiver end */
>>>    #else
>>> -    ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN -
>>> -                    2); /* 2 bytes for keeping len, to cross
>>> -                       check at the receiver end */
>>> +    /* Next 2 bytes for keeping len, to cross check at the receiver 
>>> end.
>>> +     * Used to be encoded as below:
>>> +     *
>>> +     * ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - 2);
>>> +     *
>>> +     * However, these 2 bytes have been never examined at receiver. 
>>> As backward
>>> +     * compatibility, the fragment header and mds data header can't 
>>> be extended,
>>> +     * hereafter, these 2 bytes will be used as sequence number in 
>>> flow control
>>> +     * (per tipc portid)
>>> +     * */
>>> +    ncs_encode_16bit(&data, fctrl_seq_num);
>>>    #endif
>>>        return NCSCC_RC_SUCCESS;
>>>    }
>>> @@ -3060,21 +3124,25 @@ static uint32_t mdtm_sendto(uint8_t *buffer, 
>>> uint16_t buff_len,
>>>            buffer[4] = checksum;
>>>        }
>>>    #endif
>>> -
>>> -    send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
>>> -              (struct sockaddr *)&server_addr, sizeof(server_addr));
>>> -    if (send_len == buff_len) {
>>> -        m_MDS_LOG_INFO("MDTM: Successfully sent message");
>>> -        return NCSCC_RC_SUCCESS;
>>> -    } else if (send_len == -1) {
>>> -        m_MDS_LOG_ERR("MDTM: Failed to send message err :%s",
>>> -                  strerror(errno));
>>> -        return NCSCC_RC_FAILURE;
>>> -    } else {
>>> -        m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd",
>>> -                  send_len);
>>> -        return NCSCC_RC_FAILURE;
>> [HansN] NCSCC_RC_SUCCESS is returned even if no message has been sent as
>> the return code from mds_tipc_fctrl_trysend only checks for
>> NCSCC_RC_SUCCESS, else stmt missing?
> [Minh] The 'else' statement is not missing, it may be a bit clear in 
> the patch 6/9. The mds_tipc_fctrl_trysend() returns FAILURE, either of 
> invalid tipc port id, or the flag of receiver's buffer overflow turns 
> on. In both cases, mds won't call sendto() to send the msg out.
>>> +    if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == 
>>> NCSCC_RC_SUCCESS) {
>>> +        send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0,
>>> +                    (struct sockaddr *)&server_addr, 
>>> sizeof(server_addr));
>>> +        if (send_len == buff_len) {
>>> +            m_MDS_LOG_INFO("MDTM: Successfully sent message");
>>> +            return NCSCC_RC_SUCCESS;
>>> +        } else if (send_len == -1) {
>>> +            m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", 
>>> strerror(errno));
>>> +            // todo: it's failed to send msg, if flow control is 
>>> enabled
>>> +            // the msg needs to mark unsent
>>> +            return NCSCC_RC_FAILURE;
>>> +        } else {
>>> +            m_MDS_LOG_ERR("MDTM: Failed to send message send_len 
>>> :%zd", send_len);
>>> +            // todo: it's failed to send msg, if flow control is 
>>> enabled
>>> +            // the msg needs to mark unsent
>>> +            return NCSCC_RC_FAILURE;
>>> +        }
>>>        }
>>> +    return NCSCC_RC_SUCCESS;
>>>    }
>>> /*********************************************************
>>> @@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void 
>>> *buffer, size_t size,
>>>        server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE);
>>>        /*This can be scope-down to dest_svc_id  server_inst TBD*/
>>>        server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE);
>>> -
>>>        int send_len =
>>>            sendto(tipc_cb.BSRsock, buffer, size, 0,
>>>               (struct sockaddr *)&server_addr, sizeof(server_addr));
>>> +
>>>        if (send_len == size) {
>>> -        m_MDS_LOG_INFO("MDTM: Successfully sent message");
>>> +        m_MDS_LOG_INFO("MDTM: Successfully sent mcast message");
>>>            return NCSCC_RC_SUCCESS;
>>>        } else {
>>>            m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err 
>>> :%s",
>>> @@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t 
>>> *buffer, MDTM_SEND_REQ *req)
>>>        uint8_t *ptr;
>>>        ptr = buffer;
>>>    -    prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1));
>>> +    prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1));
>>>        enc_snd_type = (req->msg.encoding << 6);
>>>        enc_snd_type |= (uint8_t)req->snd_type;
>>>    diff --git a/src/mds/mds_tipc_fctrl_intf.cc 
>>> b/src/mds/mds_tipc_fctrl_intf.cc
>>> new file mode 100644
>>> index 0000000..91b9107
>>> --- /dev/null
>>> +++ b/src/mds/mds_tipc_fctrl_intf.cc
>>> @@ -0,0 +1,376 @@
>>> +/*      -*- OpenSAF  -*-
>>> + *
>>> + * (C) Copyright 2019 The OpenSAF Foundation
>>> + *
>>> + * This program is distributed in the hope that it will be useful, but
>>> + * WITHOUT ANY WARRANTY; without even the implied warranty of 
>>> MERCHANTABILITY
>>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are 
>>> licensed
>>> + * under the GNU Lesser General Public License Version 2.1, 
>>> February 1999.
>>> + * The complete license can be accessed from the following location:
>>> + * 
>>> https://protect2.fireeye.com/url?k=a4ac81b5-f8252ede-a4acc12e-0cc47ad93ea2-93ec22754767d852&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>>> + * See the Copying file included with the OpenSAF distribution for 
>>> full
>>> + * licensing terms.
>>> + *
>>> + * Author(s): Ericsson AB
>>> + *
>>> + */
>>> +#include "mds/mds_tipc_fctrl_intf.h"
>>> +
>>> +#include <poll.h>
>>> +#include <pthread.h>
>>> +#include <stdio.h>
>>> +#include <sys/poll.h>
>>> +#include <unistd.h>
>>> +
>>> +#include <map>
>>> +#include <mutex>
>>> +
>>> +#include "base/ncssysf_def.h"
>>> +#include "base/ncssysf_tsk.h"
>>> +
>>> +#include "mds/mds_log.h"
>>> +#include "mds/mds_tipc_fctrl_portid.h"
>>> +#include "mds/mds_tipc_fctrl_msg.h"
>>> +
>>> +using mds::Event;
>>> +using mds::TipcPortId;
>>> +using mds::DataMessage;
>>> +using mds::ChunkAck;
>>> +using mds::HeaderMessage;
>>> +
>>> +namespace {
>>> +// multicast/broadcast enabled
>>> +// todo: to be removed if flow control support it
>>> +bool is_mcast_enabled = true;
>>> +
>>> +// mailbox handles for events
>>> +SYSF_MBX mbx_events;
>>> +
>>> +// ncs task handle
>>> +NCSCONTEXT p_task_hdl = nullptr;
>>> +
>>> +// data socket associated with port id
>>> +int data_sock_fd = 0;
>>> +struct tipc_portid snd_rcv_portid;
>>> +
>>> +// socket receiver's buffer size
>>> +// todo: This buffer size is read from the local node as an estimated
>>> +// buffer size of the receiver sides, in facts that could be different
>>> +// at the receiver's sides (in the other nodes). At this moment, we
>>> +// assume all nodes in cluster have set the same tipc buffer size
>>> +uint64_t sock_buf_size = 0;
>>> +
>>> +// map of key:unique id(adest), value: TipcPortId instance
>>> +// unique id is derived from struct tipc_portid
>>> +std::map<uint64_t, TipcPortId*> portid_map;
>>> +std::mutex portid_map_mutex;
>>> +
>>> +// chunk ack parameters
>>> +// todo: The chunk ack size should be configurable
>>> +uint16_t kChunkAckSize = 3;
>>> +
>>> +TipcPortId* portid_lookup(struct tipc_portid id) {
>>> +  uint64_t uid = TipcPortId::GetUniqueId(id);
>>> +  if (portid_map.find(uid) == portid_map.end()) return nullptr;
>>> +  return portid_map[uid];
>>> +}
>>> +
>>> +uint32_t process_flow_event(const Event evt) {
>>> +  uint32_t rc = NCSCC_RC_SUCCESS;
>>> +  TipcPortId *portid = portid_lookup(evt.id_);
>>> +  if (portid == nullptr) {
>>> +    if (evt.type_ == Event::Type::kEvtRcvData) {
>>> +      portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize,
>>> +          sock_buf_size);
>>> +      portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid;
>>> +      if (evt.type_ == Event::Type::kEvtRcvData) {
>>> +        rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
>>> +            evt.fseq_, evt.svc_id_);
>>> +      }
>>> +    }
>>> +  } else {
>>> +    if (evt.type_ == Event::Type::kEvtRcvData) {
>>> +      rc = portid->ReceiveData(evt.mseq_, evt.mfrag_,
>>> +          evt.fseq_, evt.svc_id_);
>>> +    }
>>> +    if (evt.type_ == Event::Type::kEvtRcvChunkAck) {
>>> +      portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_);
>>> +    }
>>> +    if (evt.type_ == Event::Type::kEvtSendChunkAck) {
>>> +      portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_);
>>> +    }
>>> +    if (evt.type_ == Event::Type::kEvtDropData) {
>>> +      portid->ReceiveNack(evt.mseq_, evt.mfrag_,
>>> +          evt.fseq_);
>>> +    }
>>> +  }
>>> +  return rc;
>>> +}
>>> +
>>> +uint32_t process_all_events(void) {
>>> +  enum { FD_FCTRL = 0, NUM_FDS };
>>> +
>>> +  int poll_tmo = MDTM_TIPC_POLL_TIMEOUT;
>>> +  while (true) {
>>> +    int pollres;
>>> +    struct pollfd pfd[NUM_FDS] = {{0}};
>>> +
>>> +    pfd[FD_FCTRL].fd =
>>> +        ncs_ipc_get_sel_obj(&mbx_events).rmv_obj;
>>> +    pfd[FD_FCTRL].events = POLLIN;
>>> +
>>> +    pollres = poll(pfd, NUM_FDS, poll_tmo);
>>> +
>>> +    if (pollres == -1) {
>>> +      if (errno == EINTR) continue;
>>> +      m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno));
>>> +      break;
>>> +    }
>>> +
>>> +    if (pollres > 0) {
>>> +      if (pfd[FD_FCTRL].revents == POLLIN) {
>>> +        Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv(
>>> +            &mbx_events));
>>> +
>>> +        if (evt == nullptr) continue;
>>> +
>>> +        portid_map_mutex.lock();
>>> +        process_flow_event(*evt);
>>> +        delete evt;
>>> +        portid_map_mutex.unlock();
>>> +      }
>>> +    }
>>> +  }  /* while */
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +uint32_t create_ncs_task(void *task_hdl) {
>>> +  int policy = SCHED_OTHER; /*root defaults */
>>> +  int max_prio = sched_get_priority_max(policy);
>>> +  int min_prio = sched_get_priority_min(policy);
>>> +  int prio_val = ((max_prio - min_prio) * 0.87);
>>> +
>>> +  if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) {
>>> +    m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed");
>>> +    return NCSCC_RC_FAILURE;
>>> +  }
>>> +  if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) {
>>> +    m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed");
>>> +    return NCSCC_RC_FAILURE;
>>> +  }
>>> +  if (ncs_task_create((NCS_OS_CB)process_all_events, 0,
>>> +          "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE,
>>> +          &task_hdl) != NCSCC_RC_SUCCESS) {
>>> +    m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n");
>>> +    return NCSCC_RC_FAILURE;
>>> +  }
>>> +
>>> +  m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events");
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +}  // end local namespace
>>> +
>>> +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct 
>>> tipc_portid id,
>>> +    uint64_t rcv_buf_size, bool mcast_enabled) {
>>> +  if (create_ncs_task(&p_task_hdl) !=
>>> +      NCSCC_RC_SUCCESS) {
>>> +    m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n");
>>> +    return NCSCC_RC_FAILURE;
>>> +  }
>>> +  data_sock_fd = dgramsock;
>>> +  snd_rcv_portid = id;
>>> +  sock_buf_size = rcv_buf_size;
>>> +  is_mcast_enabled = mcast_enabled;
>>> +
>>> +  m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]",
>>> +      id.node, id.ref);
>>> +
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_shutdown(void) {
>>> +  if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) {
>>> +    m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n");
>>> +  }
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, 
>>> uint16_t len,
>>> +          uint16_t* next_seq) {
>>> +  uint32_t rc = NCSCC_RC_SUCCESS;
>>> +
>>> +  portid_map_mutex.lock();
>>> +
>>> +  TipcPortId *portid = portid_lookup(id);
>>> +  if (portid == nullptr) {
>>> +    m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
>>> +        id.node, id.ref, __LINE__);
>>> +    rc = NCSCC_RC_FAILURE;
>>> +  } else {
>>> +    // assign the sequence number of the outgoing message
>>> +    *next_seq = portid->GetCurrentSeq();
>>> +  }
>>> +
>>> +  portid_map_mutex.unlock();
>>> +
>>> +  return rc;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
>>> +    struct tipc_portid id) {
>>> +  uint32_t rc = NCSCC_RC_SUCCESS;
>>> +
>>> +  portid_map_mutex.lock();
>>> +
>>> +  TipcPortId *portid = portid_lookup(id);
>>> +  if (portid == nullptr) {
>>> +    m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u",
>>> +        id.node, id.ref, __LINE__);
>>> +    rc = NCSCC_RC_FAILURE;
>>> +  } else {
>>> +    portid->Queue(buffer, len);
>>> +  }
>>> +
>>> +  portid_map_mutex.unlock();
>>> +
>>> +  return rc;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t 
>>> type) {
>>> +  MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
>>> +
>>> +  portid_map_mutex.lock();
>>> +
>>> +  // Add this new tipc portid to the map
>>> +  TipcPortId *portid = portid_lookup(id);
>>> +  uint64_t uid = TipcPortId::GetUniqueId(id);
>>> +  if (portid == nullptr) {
>>> +    portid_map[uid] = portid = new TipcPortId(id, data_sock_fd,
>>> +        kChunkAckSize, sock_buf_size);
>>> +    m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u], 
>>> svc_cnt:%u",
>>> +        id.node, id.ref, svc_id, portid->svc_cnt_);
>>> +  } else {
>>> +    portid->svc_cnt_++;
>>> +    m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u], 
>>> svc_cnt:%u",
>>> +        id.node, id.ref, svc_id, portid->svc_cnt_);
>>> +  }
>>> +
>>> +  portid_map_mutex.unlock();
>>> +
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t 
>>> type) {
>>> +  MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID);
>>> +
>>> +  portid_map_mutex.lock();
>>> +
>>> +  // Delete this tipc portid out of the map
>>> +  TipcPortId *portid = portid_lookup(id);
>>> +  if (portid != nullptr) {
>>> +    portid->svc_cnt_--;
>>> +    m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u], 
>>> svc_cnt:%u",
>>> +        id.node, id.ref, svc_id, portid->svc_cnt_);
>>> +  }
>>> +  portid_map_mutex.unlock();
>>> +
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) {
>>> +  portid_map_mutex.lock();
>>> +
>>> +  // Delete this tipc portid out of the map
>>> +  TipcPortId *portid = portid_lookup(id);
>>> +  if (portid != nullptr) {
>>> +    delete portid;
>>> +    portid_map.erase(TipcPortId::GetUniqueId(id));
>>> +    m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]", 
>>> id.node, id.ref);
>>> +  }
>>> +
>>> +  portid_map_mutex.unlock();
>>> +
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
>>> +    struct tipc_portid id) {
>>> +  HeaderMessage header;
>>> +  header.Decode(buffer);
>>> +  // if mds support flow control
>>> +  if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
>>> +    if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
>>> +      if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
>>> +        // receive single ack message
>>> +        ChunkAck ack;
>>> +        ack.Decode(buffer);
>>> +        // send to the event thread
>>> +        if (m_NCS_IPC_SEND(&mbx_events,
>>> +            new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_,
>>> +                header.mseq_, header.mfrag_, ack.acked_fseq_, 
>>> ack.chunk_size_),
>>> +            NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
>>> +          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
>>> +          return NCSCC_RC_FAILURE;
>>> +        }
>>> +        return NCSCC_RC_SUCCESS;
>>> +      }
>>> +    } else {
>>> +      // receive data message
>>> +      DataMessage data;
>>> +      data.Decode(buffer);
>>> +      // send to the event thread
>>> +      if (m_NCS_IPC_SEND(&mbx_events,
>>> +          new Event(Event::Type::kEvtDropData, id, data.svc_id_,
>>> +              header.mseq_, header.mfrag_, header.fseq_),
>>> +          NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
>>> +        m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
>>> +        return NCSCC_RC_FAILURE;
>>> +      }
>>> +      return NCSCC_RC_SUCCESS;
>>> +    }
>>> +  }
>>> +
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> +
>>> +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
>>> +    struct tipc_portid id) {
>>> +  HeaderMessage header;
>>> +  header.Decode(buffer);
>>> +  // if mds support flow control
>>> +  if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
>>> +    if (header.pro_id_ == MDS_PROT_FCTRL_ID) {
>>> +      if (header.msg_type_ == ChunkAck::kChunkAckMsgType) {
>>> +        // receive single ack message
>>> +        ChunkAck ack;
>>> +        ack.Decode(buffer);
>>> +        // send to the event thread
>>> +        if (m_NCS_IPC_SEND(&mbx_events,
>>> +            new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_,
>>> +                header.mseq_, header.mfrag_, ack.acked_fseq_, 
>>> ack.chunk_size_),
>>> +                NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) {
>>> +          m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n");
>>> +        }
>>> +        // return NCSCC_RC_FAILURE, so the tipc receiving thread 
>>> (legacy) will
>>> +        // skip this data msg
>>> +        return NCSCC_RC_FAILURE;
>>> +      }
>>> +    } else {
>>> +      // receive data message
>>> +      DataMessage data;
>>> +      data.Decode(buffer);
>>> +      // todo: skip mcast/bcast, revisit
>>> +      if ((data.snd_type_ == MDS_SENDTYPE_BCAST ||
>>> +          data.snd_type_ == MDS_SENDTYPE_RBCAST) && 
>>> is_mcast_enabled) {
>>> +        return NCSCC_RC_SUCCESS;
>>> +      }
>>> +      portid_map_mutex.lock();
>>> +      uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData,
>>> +          id, data.svc_id_, header.mseq_, header.mfrag_, 
>>> header.fseq_));
>>> +      portid_map_mutex.unlock();
>>> +      return rc;
>>> +    }
>>> +  }
>>> +  return NCSCC_RC_SUCCESS;
>>> +}
>>> diff --git a/src/mds/mds_tipc_fctrl_intf.h 
>>> b/src/mds/mds_tipc_fctrl_intf.h
>>> new file mode 100644
>>> index 0000000..85a058f
>>> --- /dev/null
>>> +++ b/src/mds/mds_tipc_fctrl_intf.h
>>> @@ -0,0 +1,47 @@
>>> +/*      -*- OpenSAF  -*-
>>> + *
>>> + * (C) Copyright 2019 The OpenSAF Foundation
>>> + *
>>> + * This program is distributed in the hope that it will be useful, but
>>> + * WITHOUT ANY WARRANTY; without even the implied warranty of 
>>> MERCHANTABILITY
>>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are 
>>> licensed
>>> + * under the GNU Lesser General Public License Version 2.1, 
>>> February 1999.
>>> + * The complete license can be accessed from the following location:
>>> + * 
>>> https://protect2.fireeye.com/url?k=8c56609d-d0dfcff6-8c562006-0cc47ad93ea2-edf93c78f64bf311&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>>> + * See the Copying file included with the OpenSAF distribution for 
>>> full
>>> + * licensing terms.
>>> + *
>>> + * Author(s): Ericsson AB
>>> + *
>>> + */
>>> +
>>> +#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_
>>> +#define MDS_MDS_TIPC_FCTRL_INTF_H_
>>> +
>>> +#include <linux/tipc.h>
>>> +#include <stdbool.h>
>>> +#include <stdint.h>
>>> +
>>> +#ifdef __cplusplus
>>> +extern "C" {
>>> +#endif
>>> +
>>> +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct 
>>> tipc_portid id,
>>> +    uint64_t rcv_buf_size, bool mbrcast_enabled);
>>> +uint32_t mds_tipc_fctrl_shutdown(void);
>>> +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len,
>>> +    struct tipc_portid id);
>>> +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t 
>>> type);
>>> +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t 
>>> type);
>>> +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id);
>>> +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len,
>>> +    struct tipc_portid id);
>>> +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, 
>>> uint16_t len,
>>> +    uint16_t* next_seq);
>>> +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len,
>>> +    struct tipc_portid id);
>>> +#ifdef __cplusplus
>>> +}
>>> +#endif
>>> +
>>> +#endif  // MDS_MDS_TIPC_FCTRL_INTF_H_
>>> diff --git a/src/mds/mds_tipc_fctrl_msg.cc 
>>> b/src/mds/mds_tipc_fctrl_msg.cc
>>> new file mode 100644
>>> index 0000000..abd38d3
>>> --- /dev/null
>>> +++ b/src/mds/mds_tipc_fctrl_msg.cc
>>> @@ -0,0 +1,142 @@
>>> +/*      -*- OpenSAF  -*-
>>> + *
>>> + * (C) Copyright 2019 The OpenSAF Foundation
>>> + *
>>> + * This program is distributed in the hope that it will be useful, but
>>> + * WITHOUT ANY WARRANTY; without even the implied warranty of 
>>> MERCHANTABILITY
>>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are 
>>> licensed
>>> + * under the GNU Lesser General Public License Version 2.1, 
>>> February 1999.
>>> + * The complete license can be accessed from the following location:
>>> + * 
>>> https://protect2.fireeye.com/url?k=6b1e74f3-3797db98-6b1e3468-0cc47ad93ea2-3e7c86e3dfcd12a1&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>>> + * See the Copying file included with the OpenSAF distribution for 
>>> full
>>> + * licensing terms.
>>> + *
>>> + * Author(s): Ericsson AB
>>> + *
>>> + */
>>> +
>>> +#include "mds/mds_tipc_fctrl_msg.h"
>>> +#include "base/ncssysf_def.h"
>>> +
>>> +namespace mds {
>>> +HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq,
>>> +    uint16_t mfrag, uint16_t fseq): msg_len_(msg_len),
>>> +        mseq_(mseq), mfrag_(mfrag), fseq_(fseq) {
>>> +  pro_ver_ = MDS_PROT_FCTRL;
>>> +  pro_id_ = 0;
>>> +  msg_type_ = 0;
>>> +}
>>> +
>>> +void HeaderMessage::Encode(uint8_t *msg) {
>>> +  uint8_t *ptr;
>>> +
>>> +  // encode message length
>>> +  ptr = &msg[0];
>>> +  ncs_encode_16bit(&ptr, msg_len_);
>>> +  // encode sequence number
>>> +  ptr = &msg[2];
>>> +  ncs_encode_32bit(&ptr, mseq_);
>>> +  // encode sequence number
>>> +  ptr = &msg[6];
>>> +  ncs_encode_16bit(&ptr, mfrag_);
>>> +  // skip length_check: oct8&9
>>> +  // encode protocol version
>>> +  ptr = &msg[10];
>>> +  ncs_encode_8bit(&ptr, MDS_PROT_FCTRL);
>>> +}
>>> +
>>> +void HeaderMessage::Decode(uint8_t *msg) {
>>> +  uint8_t *ptr;
>>> +
>>> +  // decode message length
>>> +  ptr = &msg[0];
>>> +  msg_len_ = ncs_decode_16bit(&ptr);
>>> +  // decode sequence number
>>> +  ptr = &msg[2];
>>> +  mseq_ = ncs_decode_32bit(&ptr);
>>> +  // decode fragment number
>>> +  ptr = &msg[6];
>>> +  mfrag_ = ncs_decode_16bit(&ptr);
>>> +  // decode protocol version
>>> +  ptr = &msg[10];
>>> +  pro_ver_ = ncs_decode_8bit(&ptr);
>>> +  if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) {
>>> +    // decode flow control sequence number
>>> +    ptr = &msg[8];
>>> +    fseq_ = ncs_decode_16bit(&ptr);
>>> +    // decode protocol identifier
>>> +    ptr = &msg[11];
>>> +    pro_id_ = ncs_decode_32bit(&ptr);
>>> +    if (pro_id_ == MDS_PROT_FCTRL_ID) {
>>> +      // decode message type
>>> +      ptr = &msg[15];
>>> +      msg_type_ = ncs_decode_8bit(&ptr);
>>> +    }
>>> +  } else {
>>> +    if (mfrag_ != 0) {
>>> +      ptr = &msg[8];
>>> +      fseq_ = ncs_decode_16bit(&ptr);
>>> +      if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL;
>>> +    }
>>> +  }
>>> +}
>>> +
>>> +void DataMessage::Decode(uint8_t *msg) {
>>> +  uint8_t *ptr;
>>> +
>>> +  // decode service id
>>> +  ptr = &msg[MDTM_PKT_TYPE_OFFSET +
>>> +             MDTM_FRAG_HDR_LEN +
>>> +             MDS_HEADER_RCVR_SVC_ID_POSITION];
>>> +  svc_id_ = ncs_decode_16bit(&ptr);
>>> +  // decode snd_type
>>> +  ptr = &msg[17];
>>> +  snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f;
>>> +}
>>> +
>>> +DataMessage::~DataMessage() {
>>> +  if (msg_data_ != nullptr) {
>>> +    delete msg_data_;
>>> +    msg_data_ = nullptr;
>>> +  }
>>> +}
>>> +
>>> +ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t 
>>> chunk_size):
>>> +    svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) {
>>> +  msg_type_ = kChunkAckMsgType;
>>> +}
>>> +
>>> +void ChunkAck::Encode(uint8_t *msg) {
>>> +  uint8_t *ptr;
>>> +  // encode protocol identifier
>>> +  ptr = &msg[11];
>>> +  ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID);
>>> +  // encode message type
>>> +  ptr = &msg[15];
>>> +  ncs_encode_8bit(&ptr, kChunkAckMsgType);
>>> +  // encode service id
>>> +  ptr = &msg[16];
>>> +  ncs_encode_16bit(&ptr, svc_id_);
>>> +  // encode flow control sequence number
>>> +  ptr = &msg[18];
>>> +  ncs_encode_16bit(&ptr, acked_fseq_);
>>> +  // encode chunk size
>>> +  ptr = &msg[20];
>>> +  ncs_encode_16bit(&ptr, chunk_size_);
>>> +}
>>> +
>>> +void ChunkAck::Decode(uint8_t *msg) {
>>> +  uint8_t *ptr;
>>> +
>>> +  // decode service id
>>> +  ptr = &msg[16];
>>> +  svc_id_ = ncs_decode_16bit(&ptr);
>>> +  // decode flow control sequence number
>>> +  ptr = &msg[18];
>>> +  acked_fseq_ = ncs_decode_16bit(&ptr);
>>> +  // decode chunk size
>>> +  ptr = &msg[20];
>>> +  chunk_size_ = ncs_decode_16bit(&ptr);
>>> +}
>>> +
>>> +}  // end namespace mds
>>> diff --git a/src/mds/mds_tipc_fctrl_msg.h 
>>> b/src/mds/mds_tipc_fctrl_msg.h
>>> new file mode 100644
>>> index 0000000..677f256
>>> --- /dev/null
>>> +++ b/src/mds/mds_tipc_fctrl_msg.h
>>> @@ -0,0 +1,129 @@
>>> +/*      -*- OpenSAF  -*-
>>> + *
>>> + * (C) Copyright 2019 The OpenSAF Foundation
>>> + *
>>> + * This program is distributed in the hope that it will be useful, but
>>> + * WITHOUT ANY WARRANTY; without even the implied warranty of 
>>> MERCHANTABILITY
>>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are 
>>> licensed
>>> + * under the GNU Lesser General Public License Version 2.1, 
>>> February 1999.
>>> + * The complete license can be accessed from the following location:
>>> + * 
>>> https://protect2.fireeye.com/url?k=4fa312f3-132abd98-4fa35268-0cc47ad93ea2-ca1523d965a4960f&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>>> + * See the Copying file included with the OpenSAF distribution for 
>>> full
>>> + * licensing terms.
>>> + *
>>> + * Author(s): Ericsson AB
>>> + *
>>> + */
>>> +
>>> +#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_
>>> +#define MDS_MDS_TIPC_FCTRL_MSG_H_
>>> +
>>> +#include <linux/tipc.h>
>>> +#include "base/ncssysf_ipc.h"
>>> +#include "base/ncssysf_tmr.h"
>>> +#include "mds/mds_dt.h"
>>> +
>>> +namespace mds {
>>> +
>>> +class Event {
>>> + public:
>>> +  enum class Type {
>>> +    kEvtPortIdAll = 0,
>>> +    kEvtPortIdUp,     // event of new portid published (not used)
>>> +    kEvtPortIdDown,   // event of portid widthdrawn (not used)
>>> +
>>> +    kEvtDataFlowAll,
>>> +    kEvtRcvData,       // event that received data msg from peer
>>> +    kEvtSendChunkAck,  // event to send the ack for a chunk of N 
>>> accumulative
>>> +                       // data msgs (N>=1)
>>> +    kEvtRcvChunkAck,   // event that received the ack for a chunk of N
>>> +                       // accumulative data msgs (N>=1)
>>> +    kEvtSendSelectiveAck,  // event to send the ack for a number of 
>>> selective
>>> +                           // data msgs (not supported)
>>> +    kEvtRcvSelectiveAck,   // event that received the ack for a 
>>> numer of
>>> +                           // selective data msgs (not supported)
>>> +    kEvtDropData,          // event reported from tipc that a 
>>> message is not
>>> +                           // delivered
>>> +  };
>>> +  NCS_IPC_MSG next_{0};
>>> +  Type type_;
>>> +
>>> +  // Used for flow event only
>>> +  struct tipc_portid id_;
>>> +  uint16_t svc_id_{0};
>>> +  uint32_t mseq_{0};
>>> +  uint16_t mfrag_{0};
>>> +  uint16_t fseq_{0};
>>> +  uint16_t chunk_size_{1};
>>> +  explicit Event(Type type):type_(type) {}
>>> +  Event(Type type, struct tipc_portid id, uint16_t svc_id,
>>> +      uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num):
>>> +    id_(id), svc_id_(svc_id),
>>> +    mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) {
>>> +    type_ = type;
>>> +  }
>>> +  Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t 
>>> mseq,
>>> +      uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size):
>>> +    id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag),
>>> +    fseq_(f_seg_num), chunk_size_(chunk_size) {
>>> +    type_ = type;
>>> +  }
>>> +};
>>> +
>>> +class BaseMessage {
>>> + public:
>>> +  virtual ~BaseMessage() {}
>>> +  virtual void Decode(uint8_t *msg) {}
>>> +  virtual void Encode(uint8_t *msg) {}
>>> +};
>>> +
>>> +class HeaderMessage: public BaseMessage {
>>> + public:
>>> +  uint8_t* msg_ptr_{nullptr};
>>> +  uint16_t msg_len_{0};
>>> +  uint32_t mseq_{0};
>>> +  uint16_t mfrag_{0};
>>> +  uint16_t fseq_{0};
>>> +  uint8_t pro_ver_{0};
>>> +  uint32_t pro_id_{0};
>>> +  uint16_t msg_type_{0};
>>> +  HeaderMessage() {}
>>> +  HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag,
>>> +      uint16_t fseq);
>>> +  virtual ~HeaderMessage() {}
>>> +  void Decode(uint8_t *msg) override;
>>> +  void Encode(uint8_t *msg) override;
>>> +};
>>> +
>>> +class DataMessage: public BaseMessage {
>>> + public:
>>> +  HeaderMessage header_;
>>> +  uint16_t svc_id_{0};
>>> +
>>> +  uint8_t* msg_data_{nullptr};
>>> +  uint8_t snd_type_{0};
>>> +
>>> +  DataMessage() {}
>>> +  virtual ~DataMessage();
>>> +  void Decode(uint8_t *msg) override;
>>> +};
>>> +
>>> +class ChunkAck: public BaseMessage {
>>> + public:
>>> +  static const uint8_t kChunkAckMsgType = 1;
>>> +  static const uint16_t kChunkAckMsgLength = 22;
>>> +
>>> +  uint8_t msg_type_{0};
>>> +  uint16_t svc_id_{0};
>>> +  uint16_t acked_fseq_{0};
>>> +  uint16_t chunk_size_{1};
>>> +  ChunkAck() {}
>>> +  ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size);
>>> +  virtual ~ChunkAck() {}
>>> +  void Encode(uint8_t *msg) override;
>>> +  void Decode(uint8_t *msg) override;
>>> +};
>>> +
>>> +}  // end namespace mds
>>> +
>>> +#endif  // MDS_MDS_TIPC_FCTRL_MSG_H_
>>> diff --git a/src/mds/mds_tipc_fctrl_portid.cc 
>>> b/src/mds/mds_tipc_fctrl_portid.cc
>>> new file mode 100644
>>> index 0000000..24d13ee
>>> --- /dev/null
>>> +++ b/src/mds/mds_tipc_fctrl_portid.cc
>>> @@ -0,0 +1,261 @@
>>> +/*      -*- OpenSAF  -*-
>>> + *
>>> + * (C) Copyright 2019 The OpenSAF Foundation
>>> + *
>>> + * This program is distributed in the hope that it will be useful, but
>>> + * WITHOUT ANY WARRANTY; without even the implied warranty of 
>>> MERCHANTABILITY
>>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are 
>>> licensed
>>> + * under the GNU Lesser General Public License Version 2.1, 
>>> February 1999.
>>> + * The complete license can be accessed from the following location:
>>> + * 
>>> https://protect2.fireeye.com/url?k=0f27255d-53ae8a36-0f2765c6-0cc47ad93ea2-763cb574242dc0e2&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>>> + * See the Copying file included with the OpenSAF distribution for 
>>> full
>>> + * licensing terms.
>>> + *
>>> + * Author(s): Ericsson AB
>>> + *
>>> + */
>>> +
>>> +#include "mds/mds_tipc_fctrl_portid.h"
>>> +#include "base/ncssysf_def.h"
>>> +
>>> +#include "mds/mds_dt.h"
>>> +#include "mds/mds_log.h"
>>> +
>>> +namespace mds {
>>> +
>>> +void MessageQueue::Queue(DataMessage* msg) {
>>> +  queue_.push_back(msg);
>>> +}
>>> +
>>> +DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) {
>>> +  for (auto it = queue_.begin(); it != queue_.end(); ++it) {
>>> +    DataMessage *m = *it;
>>> +    if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) {
>>> +      return m;
>>> +    }
>>> +  }
>>> +  return nullptr;
>>> +}
>>> +
>>> +uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) {
>>> +  uint64_t msg_len = 0;
>>> +  for (auto it = queue_.begin(); it != queue_.end();) {
>>> +    DataMessage *m = *it;
>>> +    if (fseq_from <= m->header_.fseq_ &&
>>> +        m->header_.fseq_ <= fseq_to) {
>>> +      msg_len += m->header_.msg_len_;
>>> +      it = queue_.erase(it);
>>> +      delete m;
>>> +    } else {
>>> +      it++;
>>> +    }
>>> +  }
>>> +  return msg_len;
>>> +}
>>> +
>>> +void MessageQueue::Clear() {
>>> +  while (queue_.empty() == false) {
>>> +    DataMessage* msg = queue_.front();
>>> +    queue_.pop_front();
>>> +    delete msg;
>>> +  }
>>> +}
>>> +
>>> +TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t 
>>> chksize,
>>> +    uint64_t sock_buf_size):
>>> +  id_(id), bsrsock_(sock), chunk_size_(chksize), 
>>> rcv_buf_size_(sock_buf_size) {
>>> +}
>>> +
>>> +TipcPortId::~TipcPortId() {
>>> +  // clear all msg in sndqueue_
>>> +  sndqueue_.Clear();
>>> +}
>>> +
>>> +uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) {
>>> +  // this uid is equivalent to the mds adest
>>> +  uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref;
>>> +  return uid;
>>> +}
>>> +
>>> +uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) {
>>> +  struct sockaddr_tipc server_addr;
>>> +  ssize_t send_len = 0;
>>> +  uint32_t rc = NCSCC_RC_SUCCESS;
>>> +
>>> +  memset(&server_addr, 0, sizeof(server_addr));
>>> +  server_addr.family = AF_TIPC;
>>> +  server_addr.addrtype = TIPC_ADDR_ID;
>>> +  server_addr.addr.id = id_;
>>> +  send_len = sendto(bsrsock_, data, length, 0,
>>> +        (struct sockaddr *)&server_addr, sizeof(server_addr));
>>> +
>>> +  if (send_len == length) {
>>> +    rc = NCSCC_RC_SUCCESS;
>>> +  } else {
>>> +    m_MDS_LOG_ERR("sendto() err :%s", strerror(errno));
>>> +    rc = NCSCC_RC_FAILURE;
>>> +  }
>>> +  return rc;
>>> +}
>>> +
>>> +uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) {
>>> +  uint32_t rc = NCSCC_RC_SUCCESS;
>>> +
>>> +  DataMessage *msg = new DataMessage;
>>> +  msg->header_.Decode(const_cast<uint8_t*>(data));
>>> +  msg->Decode(const_cast<uint8_t*>(data));
>>> +  msg->msg_data_ = new uint8_t[length];
>>> +  memcpy(msg->msg_data_, data, length);
>>> +  sndqueue_.Queue(msg);
>>> +  ++sndwnd_.send_;
>>> +  sndwnd_.nacked_space_ += length;
>>> +  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
>>> +      "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], "
>>> +      "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>>> +      id_.node, id_.ref,
>>> +      msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, 
>>> length,
>>> +      sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
>>> +
>>> +  return rc;
>>> +}
>>> +
>>> +bool TipcPortId::ReceiveCapable(uint16_t sending_len) {
>>> +  return true;
>>> +}
>>> +
>>> +void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id,
>>> +    uint16_t chksize) {
>>> +  uint8_t data[ChunkAck::kChunkAckMsgLength];
>>> +
>>> +  HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq);
>>> +  header.Encode(reinterpret_cast<uint8_t*>(&data));
>>> +
>>> +  ChunkAck sack(svc_id, fseq, chksize);
>>> +  sack.Encode(reinterpret_cast<uint8_t*>(&data));
>>> +  Send(reinterpret_cast<uint8_t*>(&data), 
>>> ChunkAck::kChunkAckMsgLength);
>>> +  m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
>>> +      "SndChkAck[fseq:%u, chunk:%u]",
>>> +      id_.node, id_.ref,
>>> +      fseq, chksize);
>>> +}
>>> +
>>> +uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag,
>>> +    uint16_t fseq, uint16_t svc_id) {
>>> +  uint32_t rc = NCSCC_RC_SUCCESS;
>>> +  // update receiver sequence window
>>> +  if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) {
>>> +    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
>>> +        "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
>>> +        "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]",
>>> +        id_.node, id_.ref,
>>> +        mseq, mfrag, fseq,
>>> +        rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
>>> +
>>> +    ++rcvwnd_.rcv_;
>>> +    if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) {
>>> +      // send ack for @chunk_size_ msgs starting from fseq
>>> +      SendChunkAck(fseq, svc_id, chunk_size_);
>>> +      rcvwnd_.acked_ = rcvwnd_.rcv_;
>>> +    }
>>> +  } else {
>>> +    // todo: update rcvwnd_.nacked_space_.
>>> +    // This nacked_space_ will tell the number of bytes that has 
>>> not been acked
>>> +    // to the sender. If this nacked_space_ is growing large, and 
>>> approaching
>>> +    // the socket buffer size, the transmission of sender may be 
>>> queued.
>>> +    // this nacked_space_ can be used to detect if the 
>>> kChunkAckTimeout or
>>> +    // kChunkAckSize are set excessively large.
>>> +    // It is not used for now, so ignore it.
>>> +
>>> +    // check for transmission error
>>> +    if (rcvwnd_.rcv_ + 1 < fseq) {
>>> +      if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) {
>>> +        // peer does not realize that this portid reset
>>> +        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>>> +            "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
>>> +            "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
>>> +            "Warning[portid reset]",
>>> +            id_.node, id_.ref,
>>> +            mseq, mfrag, fseq,
>>> +            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
>>> +
>>> +        rcvwnd_.rcv_ = fseq;
>>> +      } else {
>>> +        rc = NCSCC_RC_FAILURE;
>>> +        // msg loss
>>> +        m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>>> +            "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
>>> +            "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
>>> +            "Error[msg loss]",
>>> +            id_.node, id_.ref,
>>> +            mseq, mfrag, fseq,
>>> +            rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
>>> +      }
>>> +    }
>>> +    if (fseq <= rcvwnd_.acked_) {
>>> +      rc = NCSCC_RC_FAILURE;
>>> +      // unexpected retransmission
>>> +      m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>>> +          "RcvData[mseq:%u, mfrag:%u, fseq:%u], "
>>> +          "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], "
>>> +          "Error[unexpected retransmission]",
>>> +          id_.node, id_.ref,
>>> +          mseq, mfrag, fseq,
>>> +          rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_);
>>> +    }
>>> +  }
>>> +  return rc;
>>> +}
>>> +
>>> +void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) {
>>> +  // update sender sequence window
>>> +  if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) {
>>> +    m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], "
>>> +        "RcvChkAck[fseq:%u, chunk:%u], "
>>> +        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
>>> +        "queue[size:%" PRIu64 "]",
>>> +        id_.node, id_.ref,
>>> +        fseq, chksize,
>>> +        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
>>> +        sndqueue_.Size());
>>> +
>>> +    // fast forward the sndwnd_.acked_ sequence to fseq
>>> +    sndwnd_.acked_ = fseq;
>>> +
>>> +    // remove a number @chksize messages out of sndqueue_ and decrease
>>> +    // the nacked_space_ of sender
>>> +    sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, 
>>> fseq);
>>> +  } else {
>>> +    m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], "
>>> +        "RcvChkAck[fseq:%u, chunk:%u], "
>>> +        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], "
>>> +        "queue[size:%" PRIu64 "], "
>>> +        "Error[msg disordered]",
>>> +        id_.node, id_.ref,
>>> +        fseq, chksize,
>>> +        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_,
>>> +        sndqueue_.Size());
>>> +  }
>>> +}
>>> +
>>> +void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag,
>>> +    uint16_t fseq) {
>>> +  DataMessage* msg = sndqueue_.Find(mseq, mfrag);
>>> +  if (msg != nullptr) {
>>> +    // Resend the msg found
>>> +    Send(msg->msg_data_, msg->header_.msg_len_);
>>> +    m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], "
>>> +        "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
>>> +        "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]",
>>> +        id_.node, id_.ref,
>>> +        mseq, mfrag, fseq,
>>> +        sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_);
>>> +  } else {
>>> +    m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], "
>>> +        "RsndData[mseq:%u, mfrag:%u, fseq:%u], "
>>> +        "Error[msg not found]",
>>> +        id_.node, id_.ref,
>>> +        mseq, mfrag, fseq);
>>> +  }
>>> +}
>>> +
>>> +}  // end namespace mds
>>> diff --git a/src/mds/mds_tipc_fctrl_portid.h 
>>> b/src/mds/mds_tipc_fctrl_portid.h
>>> new file mode 100644
>>> index 0000000..8068e6e
>>> --- /dev/null
>>> +++ b/src/mds/mds_tipc_fctrl_portid.h
>>> @@ -0,0 +1,87 @@
>>> +/*      -*- OpenSAF  -*-
>>> + *
>>> + * (C) Copyright 2019 The OpenSAF Foundation
>>> + *
>>> + * This program is distributed in the hope that it will be useful, but
>>> + * WITHOUT ANY WARRANTY; without even the implied warranty of 
>>> MERCHANTABILITY
>>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are 
>>> licensed
>>> + * under the GNU Lesser General Public License Version 2.1, 
>>> February 1999.
>>> + * The complete license can be accessed from the following location:
>>> + * 
>>> https://protect2.fireeye.com/url?k=226846f8-7ee1e993-22680663-0cc47ad93ea2-e9b41ced4c57c419&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>>> + * See the Copying file included with the OpenSAF distribution for 
>>> full
>>> + * licensing terms.
>>> + *
>>> + * Author(s): Ericsson AB
>>> + *
>>> + */
>>> +
>>> +#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_
>>> +#define MDS_MDS_TIPC_FCTRL_PORTID_H_
>>> +
>>> +#include <linux/tipc.h>
>>> +#include <stdbool.h>
>>> +#include <stdint.h>
>>> +#include <stdio.h>
>>> +#include <unistd.h>
>>> +#include <deque>
>>> +#include "mds/mds_tipc_fctrl_msg.h"
>>> +
>>> +namespace mds {
>>> +
>>> +class MessageQueue {
>>> + public:
>>> +  void Queue(DataMessage* msg);
>>> +  DataMessage* Find(uint32_t mseq, uint16_t mfrag);
>>> +  uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to);
>>> +  uint64_t Size() const { return queue_.size(); }
>>> +  void Clear();
>>> + private:
>>> +  std::deque<DataMessage*> queue_;
>>> +};
>>> +
>>> +class TipcPortId {
>>> + public:
>>> +  TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size,
>>> +      uint64_t sock_buf_size);
>>> +  ~TipcPortId();
>>> +  static uint64_t GetUniqueId(struct tipc_portid id);
>>> +  int GetSock() const { return bsrsock_; }
>>> +  uint16_t GetCurrentSeq() { return sndwnd_.send_; }
>>> +  bool ReceiveCapable(uint16_t sending_len);
>>> +  void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size);
>>> +  void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t 
>>> chunk_size);
>>> +  uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag,
>>> +      uint16_t fseq, uint16_t svc_id);
>>> +  void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq);
>>> +  uint32_t Send(uint8_t* data, uint16_t length);
>>> +  uint32_t Queue(const uint8_t* data, uint16_t length);
>>> +
>>> +  uint16_t svc_cnt_{1};  // number of service subscribed on this 
>>> portid
>>> +
>>> + private:
>>> +  struct tipc_portid id_;
>>> +  int bsrsock_;  // tipc socket to send/receive data per tipc_portid
>>> +  uint16_t chunk_size_{5};
>>> +  uint64_t rcv_buf_size_{0};  // estimated buffer size at receiver
>>> +
>>> +  struct sndwnd {
>>> +    // sender sequence window
>>> +    uint16_t acked_{0};  // last sequence has been acked by receiver
>>> +    uint16_t send_{1};   // next sequence to be sent
>>> +    uint64_t nacked_space_{0};  // total bytes are sent but not acked
>>> +  };
>>> +  struct sndwnd sndwnd_;
>>> +
>>> +  struct rcvwnd {
>>> +    // receiver sequence window
>>> +    uint16_t acked_{0};  // last sequence has been acked to sender
>>> +    uint16_t rcv_{0};    // last sequence has been received
>>> +    uint64_t nacked_space_{0};  // total bytes has not been acked
>>> +  };
>>> +  struct rcvwnd rcvwnd_;
>>> +
>>> +  MessageQueue sndqueue_;
>>> +};
>>> +
>>> +}  // end namespace mds
>>> +#endif  // MDS_MDS_TIPC_FCTRL_PORTID_H_

_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to