Hi Minh, see one comment below. /Thanks Hans
On 2019-08-23 03:48, Minh Hon Chau wrote: > Hi Hans, > > Thanks for your time to review the patch, please see my replies below > your comments. > > Regards, > > Minh > > On 22/8/19 11:07 pm, Hans Nordebäck wrote: >> Hi Minh, >> >> it is a large patch so i have to review parts of it, below are my >> comments, marked with [HansN], for files: >> >> src/mds/Makefile.am >> src/mds/mds_dt.h >> src/mds/mds_dt_tipc.c >> >> I'll continue with the rest of the files a bit later. /Thanks Hans >> >> On 2019-08-14 08:38, Minh Chau wrote: >>> This is a collaborative patch of two participants:Thuan, Minh. >>> >>> Main changes: >>> - Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files >>> introduce new functions which are called in mds_dt_tipc.c if the flow >>> control is enabled >>> - Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files >>> implements the tipc portid instance, which supports the sliding window, >>> mds msg queue >>> - Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define >>> the event and messages which are used for this solution. >>> --- >>> src/mds/Makefile.am | 10 +- >>> src/mds/mds_dt.h | 8 +- >>> src/mds/mds_dt_tipc.c | 188 +++++++++++++------- >>> src/mds/mds_tipc_fctrl_intf.cc | 376 >>> +++++++++++++++++++++++++++++++++++++++ >>> src/mds/mds_tipc_fctrl_intf.h | 47 +++++ >>> src/mds/mds_tipc_fctrl_msg.cc | 142 +++++++++++++++ >>> src/mds/mds_tipc_fctrl_msg.h | 129 ++++++++++++++ >>> src/mds/mds_tipc_fctrl_portid.cc | 261 +++++++++++++++++++++++++++ >>> src/mds/mds_tipc_fctrl_portid.h | 87 +++++++++ >>> 9 files changed, 1184 insertions(+), 64 deletions(-) >>> create mode 100644 src/mds/mds_tipc_fctrl_intf.cc >>> create mode 100644 src/mds/mds_tipc_fctrl_intf.h >>> create mode 100644 src/mds/mds_tipc_fctrl_msg.cc >>> create mode 100644 src/mds/mds_tipc_fctrl_msg.h >>> create mode 100644 src/mds/mds_tipc_fctrl_portid.cc >>> create mode 100644 src/mds/mds_tipc_fctrl_portid.h >>> >>> diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am >>> index 2d7b652..d849e8f 100644 >>> --- a/src/mds/Makefile.am >>> +++ b/src/mds/Makefile.am >>> @@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \ >>> if ENABLE_TIPC_TRANSPORT >>> noinst_HEADERS += src/mds/mds_dt_tipc.h \ >>> src/mds/mds_tipc_recvq_stats.h \ >>> - src/mds/mds_tipc_recvq_stats_impl.h >>> + src/mds/mds_tipc_recvq_stats_impl.h \ >>> + src/mds/mds_tipc_fctrl_intf.h \ >>> + src/mds/mds_tipc_fctrl_portid.h \ >>> + src/mds/mds_tipc_fctrl_msg.h >>> lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \ >>> src/mds/mds_tipc_recvq_stats.cc \ >>> - src/mds/mds_tipc_recvq_stats_impl.cc >>> + src/mds/mds_tipc_recvq_stats_impl.cc \ >>> + src/mds/mds_tipc_fctrl_intf.cc \ >>> + src/mds/mds_tipc_fctrl_portid.cc \ >>> + src/mds/mds_tipc_fctrl_msg.cc >>> endif >>> if ENABLE_TESTS >>> diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h >>> index b645bb4..d9e8633 100644 >>> --- a/src/mds/mds_dt.h >>> +++ b/src/mds/mds_dt.h >>> @@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL >>> ref); >>> uint32_t mds_tmr_mailbox_processing(void); >>> uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL >>> *svc_hdl); >>> uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, >>> uint32_t seq_num, >>> - uint16_t frag_byte); >>> + uint16_t frag_byte, uint16_t >>> fctrl_seq_num); >>> uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg); >>> uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, >>> uint64_t tipc_id, >>> uint32_t *buff_dump); >>> @@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, >>> NCSCONTEXT msg); >>> #define MDS_PROT 0xA0 >>> #define MDS_VERSION 0x08 >>> -#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION) >>> +#define MDS_PROT_VER_MASK 0xFC >>> #define MDTM_PRI_MASK 0x3 >>> +/* MDS protocol/version for flow control */ >>> +#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION) >>> +#define MDS_PROT_FCTRL_ID 0x00AC13F5 >>> + >>> /* Added for the subscription changes */ >>> #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff0000) >>> #define MDS_TIPC_COMMON_ID 0x01001000 >>> diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c >>> index 86b52bb..fef1c50 100644 >>> --- a/src/mds/mds_dt_tipc.c >>> +++ b/src/mds/mds_dt_tipc.c >>> @@ -47,6 +47,7 @@ >>> #include "mds_dt_tipc.h" >>> #include "mds_dt_tcp_disc.h" >>> #include "mds_core.h" >>> +#include "mds_tipc_fctrl_intf.h" >>> #include "mds_tipc_recvq_stats.h" >>> #include "base/osaf_utility.h" >>> #include "base/osaf_poll.h" >>> @@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list; >>> uint32_t mdtm_global_frag_num; >>> const unsigned int MAX_RECV_THRESHOLD = 30; >>> +uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION; >>> -static bool get_tipc_port_id(int sock, uint32_t* port_id) { >>> +static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) { >>> struct sockaddr_tipc addr; >>> socklen_t sz = sizeof(addr); >>> memset(&addr, 0, sizeof(addr)); >>> - *port_id = 0; >>> + port_id->node = 0; >>> + port_id->ref = 0; >>> if (0 > getsockname(sock, (struct sockaddr *)&addr, &sz)) { >>> syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, >>> err: %s", >>> strerror(errno)); >>> return false; >>> } >>> - *port_id = addr.addr.id.ref; >>> + *port_id = addr.addr.id; >>> return true; >>> } >>> @@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, >>> uint32_t *mds_tipc_ref) >>> } >>> /* Code for getting the self tipc random number */ >>> - if (!get_tipc_port_id(tipc_cb.BSRsock, mds_tipc_ref)) { >>> + struct tipc_portid port_id; >> [HansN] mds_tipc_ref was previously set to 0 in get_tipc_port_id , this >> is now missing. > [Minh] It's now the port_id.ref set to 0 in new get_tipc_port_id(), > then set it back to mds_tipc_ref. [HansN] if get_tipc_port_id returns false mds_tipc_ref is not set at return NCSCC_RETURN_FAILURE >>> + if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) { >>> close(tipc_cb.Dsock); >>> close(tipc_cb.BSRsock); >>> return NCSCC_RC_FAILURE; >>> } >>> - >>> + *mds_tipc_ref = port_id.ref; >>> tipc_cb.adest = ((uint64_t)(nodeid)) << 32; >>> tipc_cb.adest |= *mds_tipc_ref; >>> tipc_cb.node_id = nodeid; >>> @@ -325,6 +329,23 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, >>> uint32_t *mds_tipc_ref) >>> mdtm_set_transport(MDTM_TX_TYPE_TIPC); >>> } >>> + /* Get tipc socket receive buffer size */ >> [HansN] int optval = 0; >>> + int optval; >>> + socklen_t optlen = sizeof(optval); >>> + if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF, >>> + &optval, &optlen) != 0) { >>> + syslog(LOG_ERR, "MDTM: getsockopt() failed to get rcv buf >>> size to: %str", >>> + strerror(errno)); >>> + close(tipc_cb.Dsock); >>> + close(tipc_cb.BSRsock); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + >>> + /* Create flow control tasks if enabled*/ >>> + gl_mds_pro_ver = MDS_PROT_FCTRL; >> [HansN] a question, optval is int (receive buffer size) and casted to >> uint64_t (the cast is not needed) in mds_tipc_fctrl_initialize, why use >> different types, int and uint64_t? > [Minh] a mistake, int should be fine. >>> + mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, >>> + (uint64_t)optval, tipc_mcast_enabled); >>> + >>> /* Create a task to receive the events and data */ >>> if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != >>> NCSCC_RC_SUCCESS) { >>> syslog(LOG_ERR, >>> @@ -469,7 +490,7 @@ uint32_t mdtm_tipc_destroy(void) >>> NULL); >>> m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx, >>> (NCS_IPC_CB)mdtm_mailbox_mbx_cleanup); >>> - >>> + mds_tipc_fctrl_shutdown(); >>> /* Clear reference hdl list */ >>> while (mdtm_ref_hdl_list_hdr != NULL) { >>> /* Store temporary the pointer of entry to be deleted */ >>> @@ -672,36 +693,26 @@ ssize_t recvfrom_connectionless(int sd, void >>> *buf, size_t nbytes, int flags, >>> 0)); >>> if (anc_data[0] == TIPC_ERR_OVERLOAD) { >>> LOG_ER( >>> - "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> >>> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD", >>> - ((struct sockaddr_tipc*) >>> from)->addr.id.node, >>> - ((struct sockaddr_tipc*) >>> from)->addr.id.ref); >>> - m_MDS_LOG_CRITICAL( >>> "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> >>> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD ", >>> ((struct sockaddr_tipc*) >>> from)->addr.id.node, >>> ((struct sockaddr_tipc*) >>> from)->addr.id.ref); >>> - } else { >>> + } else if (anc_data[0] == TIPC_ERR_NO_PORT){ >>> /* TIPC_ERRINFO - TIPC error >>> * code associated with a >>> * returned data message or a >>> * connection termination >>> * message */ >>> - m_MDS_LOG_DBG( >>> - "MDTM: undelivered message condition >>> ancillary data: TIPC_ERRINFO err : %d", >>> - anc_data[0]); >>> + mds_tipc_fctrl_portid_terminate(((struct >>> sockaddr_tipc*)from)->addr.id); >>> + } else { >>> + m_MDS_LOG_ERR("MDTM:: TIPC_ERRINFO >>> anc_data[0]:%u", anc_data[0]); >>> } >>> } else if (anc->cmsg_type == TIPC_RETDATA) { >>> - /* If we set TIPC_DEST_DROPPABLE off >>> - message (configure TIPC to return >>> - rejected messages to the sender ) we >>> - will hit this when we implement MDS >>> - retransmit lost messages, can be >>> - replaced with flow control logic */ >>> /* TIPC_RETDATA -The contents of a >>> * returned data message */ >>> - LOG_IN( >>> - "MDTM: undelivered message condition >>> ancillary data: TIPC_RETDATA"); >>> - m_MDS_LOG_INFO( >>> - "MDTM: undelivered message condition >>> ancillary data: TIPC_RETDATA"); >>> + LOG_IN("MDTM: undelivered message condition >>> ancillary data: TIPC_RETDATA"); >>> + uint16_t ret_msg_len = anc->cmsg_len - >>> sizeof(*anc); >>> + unsigned char *ret_msg = CMSG_DATA(anc); >>> + mds_tipc_fctrl_drop_data(ret_msg, ret_msg_len, >>> ((struct sockaddr_tipc*)from)->addr.id); >>> } else if (anc->cmsg_type == TIPC_DESTNAME) { >>> if (sz == 0) { >>> m_MDS_LOG_DBG( >>> @@ -822,7 +833,7 @@ static uint32_t mdtm_process_recv_events(void) >>> m_MDS_LOG_INFO( >>> "MDTM: Published: "); >>> m_MDS_LOG_INFO( >>> - "MDTM: <%u,%u,%u> port id >>> <0x%08x:%u>\n", >>> + "MDTM: <%x,%x,%x> port id >>> <0x%08x:%u>\n", >>> NTOHL( >>> event.s >>> .seq >>> @@ -841,7 +852,12 @@ static uint32_t mdtm_process_recv_events(void) >>> event >>> .port >>> .ref)); >>> - >>> + struct tipc_portid id = { >>> + .node = NTOHL(event.port.node), >>> + .ref = NTOHL(event.port.ref) >>> + }; >>> + uint32_t type = >>> NTOHL(event.s.seq.type); >>> + mds_tipc_fctrl_portid_up(id, type); >>> if (NCSCC_RC_SUCCESS != >>> mdtm_process_discovery_events( >>> TIPC_PUBLISHED, >>> @@ -873,7 +889,12 @@ static uint32_t mdtm_process_recv_events(void) >>> event >>> .port >>> .ref)); >>> - >>> + struct tipc_portid id = { >>> + .node = NTOHL(event.port.node), >>> + .ref = NTOHL(event.port.ref) >>> + }; >>> + uint32_t type = >>> NTOHL(event.s.seq.type); >>> + mds_tipc_fctrl_portid_down(id, type); >>> if (NCSCC_RC_SUCCESS != >>> mdtm_process_discovery_events( >>> TIPC_WITHDRAWN, >>> @@ -986,8 +1007,6 @@ static uint32_t mdtm_process_recv_events(void) >>> break; >>> } >>> if (recd_bytes == 0) { >>> - m_MDS_LOG_DBG( >>> - "MDTM: recd bytes=0 on received on >>> sock, abnormal/unknown/hack condition. Ignoring"); >>> break; >>> } >>> data = inbuf; >>> @@ -1076,9 +1095,10 @@ static uint32_t mdtm_process_recv_events(void) >>> &buff_dump); >> [HansN] flow control seems not supported if MDS_CHECKSUM_ENABLE_FLAG is >> set? Either we document that flow control is not supported if >> MDS_CHECKSUM_ENABLE_FLAG is set >> >> or we support it? > [Minh] At this moment we do not support MDS_CHECKSUM_ENABLE_FLAG. By > far there seem to be not any applications set it on, but we can > support it in future by stepping up the MDS version. This will be > documented. >> >>> } >>> #else >>> - mdtm_process_recv_data( >>> - &inbuf[2], recd_bytes - 2, >>> - tipc_id, &buff_dump); >>> + if (mds_tipc_fctrl_rcv_data(inbuf, >>> recd_bytes, client_addr.addr.id) >>> + == NCSCC_RC_SUCCESS) { >>> + mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, >>> &buff_dump); >>> + } >>> #endif >>> } else { >>> uint64_t tipc_id; >>> @@ -1873,9 +1893,9 @@ uint32_t mds_mdtm_svc_install_tipc(PW_ENV_ID >>> pwe_id, MDS_SVC_ID svc_id, >>> server_addr.addr.nameseq.upper = server_inst; >>> /* The self tipc random port number */ >>> - uint32_t port_id = 0; >>> + struct tipc_portid port_id; >>> get_tipc_port_id(tipc_cb.BSRsock, &port_id); >>> - m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id, >>> + m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", >>> port_id.ref, >>> server_type, server_inst); >>> if (0 != bind(tipc_cb.BSRsock, (struct sockaddr >>> *)&server_addr, >>> @@ -2495,6 +2515,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) >>> */ >>> uint32_t status = 0; >>> uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len; >> [HansN] indentation is not correct > [Minh] Will fix. >>> + uint16_t fctrl_seq_num = 0; >>> int version = req->msg_arch_word & 0x7; >>> if (version > 1) { >>> sum_mds_hdr_plus_mdtm_hdr_plus_len = >>> @@ -2583,11 +2604,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) >>> mdtm_add_mds_hdr(buffer_ack, req)) { >>> return NCSCC_RC_FAILURE; >>> } >>> - >>> + /* if sndqueue is capable, then obtain the current >>> sending seq */ >>> + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, >>> &fctrl_seq_num) >>> + == NCSCC_RC_FAILURE){ >>> + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", >>> len); >>> + return NCSCC_RC_FAILURE; >>> + } >>> /* Add frag_hdr */ >>> if (NCSCC_RC_SUCCESS != >>> mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num, >>> - 0)) { >>> + 0, fctrl_seq_num)) { >>> return NCSCC_RC_FAILURE; >>> } >>> @@ -2676,13 +2702,22 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ >>> *req) >>> free(body); >>> return NCSCC_RC_FAILURE; >>> } >>> + /* if sndqueue is capable, then obtain the current >>> sending seq */ >>> + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, >>> + len + sum_mds_hdr_plus_mdtm_hdr_plus_len, >>> + &fctrl_seq_num) == NCSCC_RC_FAILURE){ >>> + m_MDS_LOG_ERR("FCTRL: Failed to send message >>> len :%d", >>> + len + sum_mds_hdr_plus_mdtm_hdr_plus_len); >>> + free(body); >>> + return NCSCC_RC_FAILURE; >>> + } >>> if (NCSCC_RC_SUCCESS != >>> mdtm_add_frag_hdr( >>> body, >>> (len + >>> sum_mds_hdr_plus_mdtm_hdr_plus_len), >>> - frag_seq_num, 0)) { >>> + frag_seq_num, 0, fctrl_seq_num)) { >>> m_MDS_LOG_ERR( >>> "MDTM: Unable to add the frag Hdr to the >>> send msg\n"); >>> m_MMGR_FREE_BUFR_LIST(usrbuf); >>> @@ -2778,12 +2813,23 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) >>> req->msg.data.buff_info.buff); >>> return NCSCC_RC_FAILURE; >>> } >>> + /* if sndqueue is capable, then obtain the current >>> sending seq */ >>> + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, >>> + req->msg.data.buff_info.len + >>> sum_mds_hdr_plus_mdtm_hdr_plus_len, >>> + &fctrl_seq_num) == NCSCC_RC_FAILURE) { >>> + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", >>> + req->msg.data.buff_info.len + >>> sum_mds_hdr_plus_mdtm_hdr_plus_len); >>> + free(body); >>> + mds_free_direct_buff(req->msg.data.buff_info.buff); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + >>> if (NCSCC_RC_SUCCESS != >>> mdtm_add_frag_hdr( >>> body, >>> req->msg.data.buff_info.len + >>> sum_mds_hdr_plus_mdtm_hdr_plus_len, >>> - frag_seq_num, 0)) { >>> + frag_seq_num, 0, fctrl_seq_num)) { >>> m_MDS_LOG_ERR( >>> "MDTM: Unable to add the frag Hdr to the send >>> msg\n"); >>> free(body); >>> @@ -2860,6 +2906,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ >>> *req, uint32_t seq_num, >>> uint16_t i = 1; >>> uint16_t frag_val = 0; >>> uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len; >>> + uint16_t fctrl_seq_num = 0; >>> int version = req->msg_arch_word & 0x7; >>> uint32_t ret = NCSCC_RC_SUCCESS; >>> @@ -2938,9 +2985,18 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ >>> *req, uint32_t seq_num, >>> return NCSCC_RC_FAILURE; >>> } >>> } >>> + /* if sndqueue is capable, then obtain the current >>> sending seq */ >>> + if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, >>> &fctrl_seq_num) >>> + == NCSCC_RC_FAILURE) { >>> + m_MDS_LOG_ERR("FCTRL: Failed to send message len >>> :%d", len_buf); >>> + m_MMGR_FREE_BUFR_LIST(usrbuf); >>> + free(body); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + >>> if (NCSCC_RC_SUCCESS != >>> mdtm_add_frag_hdr(body, len_buf, seq_num, >>> - frag_val)) { >>> + frag_val, fctrl_seq_num)) { >>> m_MDS_LOG_ERR( >>> "MDTM: Frag hde addition failed\n"); >>> m_MMGR_FREE_BUFR_LIST(usrbuf); >>> @@ -2996,7 +3052,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ >>> *req, uint32_t seq_num, >>> *********************************************************/ >>> uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, >>> uint32_t seq_num, >>> - uint16_t frag_byte) >>> + uint16_t frag_byte, uint16_t fctrl_seq_num) >>> { >>> /* Add the FRAG HDR to the Buffer */ >>> uint8_t *data; >>> @@ -3013,9 +3069,17 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, >>> uint16_t len, uint32_t seq_num, >>> 5); /* 2 bytes for keeping len, to cross >>> check at the receiver end */ >>> #else >>> - ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - >>> - 2); /* 2 bytes for keeping len, to cross >>> - check at the receiver end */ >>> + /* Next 2 bytes for keeping len, to cross check at the receiver >>> end. >>> + * Used to be encoded as below: >>> + * >>> + * ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - 2); >>> + * >>> + * However, these 2 bytes have been never examined at receiver. >>> As backward >>> + * compatibility, the fragment header and mds data header can't >>> be extended, >>> + * hereafter, these 2 bytes will be used as sequence number in >>> flow control >>> + * (per tipc portid) >>> + * */ >>> + ncs_encode_16bit(&data, fctrl_seq_num); >>> #endif >>> return NCSCC_RC_SUCCESS; >>> } >>> @@ -3060,21 +3124,25 @@ static uint32_t mdtm_sendto(uint8_t *buffer, >>> uint16_t buff_len, >>> buffer[4] = checksum; >>> } >>> #endif >>> - >>> - send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0, >>> - (struct sockaddr *)&server_addr, sizeof(server_addr)); >>> - if (send_len == buff_len) { >>> - m_MDS_LOG_INFO("MDTM: Successfully sent message"); >>> - return NCSCC_RC_SUCCESS; >>> - } else if (send_len == -1) { >>> - m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", >>> - strerror(errno)); >>> - return NCSCC_RC_FAILURE; >>> - } else { >>> - m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd", >>> - send_len); >>> - return NCSCC_RC_FAILURE; >> [HansN] NCSCC_RC_SUCCESS is returned even if no message has been sent as >> the return code from mds_tipc_fctrl_trysend only checks for >> NCSCC_RC_SUCCESS, else stmt missing? > [Minh] The 'else' statement is not missing, it may be a bit clear in > the patch 6/9. The mds_tipc_fctrl_trysend() returns FAILURE, either of > invalid tipc port id, or the flag of receiver's buffer overflow turns > on. In both cases, mds won't call sendto() to send the msg out. >>> + if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == >>> NCSCC_RC_SUCCESS) { >>> + send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0, >>> + (struct sockaddr *)&server_addr, >>> sizeof(server_addr)); >>> + if (send_len == buff_len) { >>> + m_MDS_LOG_INFO("MDTM: Successfully sent message"); >>> + return NCSCC_RC_SUCCESS; >>> + } else if (send_len == -1) { >>> + m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", >>> strerror(errno)); >>> + // todo: it's failed to send msg, if flow control is >>> enabled >>> + // the msg needs to mark unsent >>> + return NCSCC_RC_FAILURE; >>> + } else { >>> + m_MDS_LOG_ERR("MDTM: Failed to send message send_len >>> :%zd", send_len); >>> + // todo: it's failed to send msg, if flow control is >>> enabled >>> + // the msg needs to mark unsent >>> + return NCSCC_RC_FAILURE; >>> + } >>> } >>> + return NCSCC_RC_SUCCESS; >>> } >>> /********************************************************* >>> @@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void >>> *buffer, size_t size, >>> server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE); >>> /*This can be scope-down to dest_svc_id server_inst TBD*/ >>> server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE); >>> - >>> int send_len = >>> sendto(tipc_cb.BSRsock, buffer, size, 0, >>> (struct sockaddr *)&server_addr, sizeof(server_addr)); >>> + >>> if (send_len == size) { >>> - m_MDS_LOG_INFO("MDTM: Successfully sent message"); >>> + m_MDS_LOG_INFO("MDTM: Successfully sent mcast message"); >>> return NCSCC_RC_SUCCESS; >>> } else { >>> m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err >>> :%s", >>> @@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t >>> *buffer, MDTM_SEND_REQ *req) >>> uint8_t *ptr; >>> ptr = buffer; >>> - prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1)); >>> + prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1)); >>> enc_snd_type = (req->msg.encoding << 6); >>> enc_snd_type |= (uint8_t)req->snd_type; >>> diff --git a/src/mds/mds_tipc_fctrl_intf.cc >>> b/src/mds/mds_tipc_fctrl_intf.cc >>> new file mode 100644 >>> index 0000000..91b9107 >>> --- /dev/null >>> +++ b/src/mds/mds_tipc_fctrl_intf.cc >>> @@ -0,0 +1,376 @@ >>> +/* -*- OpenSAF -*- >>> + * >>> + * (C) Copyright 2019 The OpenSAF Foundation >>> + * >>> + * This program is distributed in the hope that it will be useful, but >>> + * WITHOUT ANY WARRANTY; without even the implied warranty of >>> MERCHANTABILITY >>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >>> licensed >>> + * under the GNU Lesser General Public License Version 2.1, >>> February 1999. >>> + * The complete license can be accessed from the following location: >>> + * >>> https://protect2.fireeye.com/url?k=a4ac81b5-f8252ede-a4acc12e-0cc47ad93ea2-93ec22754767d852&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >>> + * See the Copying file included with the OpenSAF distribution for >>> full >>> + * licensing terms. >>> + * >>> + * Author(s): Ericsson AB >>> + * >>> + */ >>> +#include "mds/mds_tipc_fctrl_intf.h" >>> + >>> +#include <poll.h> >>> +#include <pthread.h> >>> +#include <stdio.h> >>> +#include <sys/poll.h> >>> +#include <unistd.h> >>> + >>> +#include <map> >>> +#include <mutex> >>> + >>> +#include "base/ncssysf_def.h" >>> +#include "base/ncssysf_tsk.h" >>> + >>> +#include "mds/mds_log.h" >>> +#include "mds/mds_tipc_fctrl_portid.h" >>> +#include "mds/mds_tipc_fctrl_msg.h" >>> + >>> +using mds::Event; >>> +using mds::TipcPortId; >>> +using mds::DataMessage; >>> +using mds::ChunkAck; >>> +using mds::HeaderMessage; >>> + >>> +namespace { >>> +// multicast/broadcast enabled >>> +// todo: to be removed if flow control support it >>> +bool is_mcast_enabled = true; >>> + >>> +// mailbox handles for events >>> +SYSF_MBX mbx_events; >>> + >>> +// ncs task handle >>> +NCSCONTEXT p_task_hdl = nullptr; >>> + >>> +// data socket associated with port id >>> +int data_sock_fd = 0; >>> +struct tipc_portid snd_rcv_portid; >>> + >>> +// socket receiver's buffer size >>> +// todo: This buffer size is read from the local node as an estimated >>> +// buffer size of the receiver sides, in facts that could be different >>> +// at the receiver's sides (in the other nodes). At this moment, we >>> +// assume all nodes in cluster have set the same tipc buffer size >>> +uint64_t sock_buf_size = 0; >>> + >>> +// map of key:unique id(adest), value: TipcPortId instance >>> +// unique id is derived from struct tipc_portid >>> +std::map<uint64_t, TipcPortId*> portid_map; >>> +std::mutex portid_map_mutex; >>> + >>> +// chunk ack parameters >>> +// todo: The chunk ack size should be configurable >>> +uint16_t kChunkAckSize = 3; >>> + >>> +TipcPortId* portid_lookup(struct tipc_portid id) { >>> + uint64_t uid = TipcPortId::GetUniqueId(id); >>> + if (portid_map.find(uid) == portid_map.end()) return nullptr; >>> + return portid_map[uid]; >>> +} >>> + >>> +uint32_t process_flow_event(const Event evt) { >>> + uint32_t rc = NCSCC_RC_SUCCESS; >>> + TipcPortId *portid = portid_lookup(evt.id_); >>> + if (portid == nullptr) { >>> + if (evt.type_ == Event::Type::kEvtRcvData) { >>> + portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize, >>> + sock_buf_size); >>> + portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; >>> + if (evt.type_ == Event::Type::kEvtRcvData) { >>> + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, >>> + evt.fseq_, evt.svc_id_); >>> + } >>> + } >>> + } else { >>> + if (evt.type_ == Event::Type::kEvtRcvData) { >>> + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, >>> + evt.fseq_, evt.svc_id_); >>> + } >>> + if (evt.type_ == Event::Type::kEvtRcvChunkAck) { >>> + portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_); >>> + } >>> + if (evt.type_ == Event::Type::kEvtSendChunkAck) { >>> + portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_); >>> + } >>> + if (evt.type_ == Event::Type::kEvtDropData) { >>> + portid->ReceiveNack(evt.mseq_, evt.mfrag_, >>> + evt.fseq_); >>> + } >>> + } >>> + return rc; >>> +} >>> + >>> +uint32_t process_all_events(void) { >>> + enum { FD_FCTRL = 0, NUM_FDS }; >>> + >>> + int poll_tmo = MDTM_TIPC_POLL_TIMEOUT; >>> + while (true) { >>> + int pollres; >>> + struct pollfd pfd[NUM_FDS] = {{0}}; >>> + >>> + pfd[FD_FCTRL].fd = >>> + ncs_ipc_get_sel_obj(&mbx_events).rmv_obj; >>> + pfd[FD_FCTRL].events = POLLIN; >>> + >>> + pollres = poll(pfd, NUM_FDS, poll_tmo); >>> + >>> + if (pollres == -1) { >>> + if (errno == EINTR) continue; >>> + m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno)); >>> + break; >>> + } >>> + >>> + if (pollres > 0) { >>> + if (pfd[FD_FCTRL].revents == POLLIN) { >>> + Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv( >>> + &mbx_events)); >>> + >>> + if (evt == nullptr) continue; >>> + >>> + portid_map_mutex.lock(); >>> + process_flow_event(*evt); >>> + delete evt; >>> + portid_map_mutex.unlock(); >>> + } >>> + } >>> + } /* while */ >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +uint32_t create_ncs_task(void *task_hdl) { >>> + int policy = SCHED_OTHER; /*root defaults */ >>> + int max_prio = sched_get_priority_max(policy); >>> + int min_prio = sched_get_priority_min(policy); >>> + int prio_val = ((max_prio - min_prio) * 0.87); >>> + >>> + if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed"); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed"); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + if (ncs_task_create((NCS_OS_CB)process_all_events, 0, >>> + "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE, >>> + &task_hdl) != NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n"); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + >>> + m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events"); >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +} // end local namespace >>> + >>> +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct >>> tipc_portid id, >>> + uint64_t rcv_buf_size, bool mcast_enabled) { >>> + if (create_ncs_task(&p_task_hdl) != >>> + NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n"); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + data_sock_fd = dgramsock; >>> + snd_rcv_portid = id; >>> + sock_buf_size = rcv_buf_size; >>> + is_mcast_enabled = mcast_enabled; >>> + >>> + m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]", >>> + id.node, id.ref); >>> + >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_shutdown(void) { >>> + if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n"); >>> + } >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, >>> uint16_t len, >>> + uint16_t* next_seq) { >>> + uint32_t rc = NCSCC_RC_SUCCESS; >>> + >>> + portid_map_mutex.lock(); >>> + >>> + TipcPortId *portid = portid_lookup(id); >>> + if (portid == nullptr) { >>> + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u", >>> + id.node, id.ref, __LINE__); >>> + rc = NCSCC_RC_FAILURE; >>> + } else { >>> + // assign the sequence number of the outgoing message >>> + *next_seq = portid->GetCurrentSeq(); >>> + } >>> + >>> + portid_map_mutex.unlock(); >>> + >>> + return rc; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, >>> + struct tipc_portid id) { >>> + uint32_t rc = NCSCC_RC_SUCCESS; >>> + >>> + portid_map_mutex.lock(); >>> + >>> + TipcPortId *portid = portid_lookup(id); >>> + if (portid == nullptr) { >>> + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u", >>> + id.node, id.ref, __LINE__); >>> + rc = NCSCC_RC_FAILURE; >>> + } else { >>> + portid->Queue(buffer, len); >>> + } >>> + >>> + portid_map_mutex.unlock(); >>> + >>> + return rc; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t >>> type) { >>> + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID); >>> + >>> + portid_map_mutex.lock(); >>> + >>> + // Add this new tipc portid to the map >>> + TipcPortId *portid = portid_lookup(id); >>> + uint64_t uid = TipcPortId::GetUniqueId(id); >>> + if (portid == nullptr) { >>> + portid_map[uid] = portid = new TipcPortId(id, data_sock_fd, >>> + kChunkAckSize, sock_buf_size); >>> + m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u], >>> svc_cnt:%u", >>> + id.node, id.ref, svc_id, portid->svc_cnt_); >>> + } else { >>> + portid->svc_cnt_++; >>> + m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u], >>> svc_cnt:%u", >>> + id.node, id.ref, svc_id, portid->svc_cnt_); >>> + } >>> + >>> + portid_map_mutex.unlock(); >>> + >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t >>> type) { >>> + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID); >>> + >>> + portid_map_mutex.lock(); >>> + >>> + // Delete this tipc portid out of the map >>> + TipcPortId *portid = portid_lookup(id); >>> + if (portid != nullptr) { >>> + portid->svc_cnt_--; >>> + m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u], >>> svc_cnt:%u", >>> + id.node, id.ref, svc_id, portid->svc_cnt_); >>> + } >>> + portid_map_mutex.unlock(); >>> + >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) { >>> + portid_map_mutex.lock(); >>> + >>> + // Delete this tipc portid out of the map >>> + TipcPortId *portid = portid_lookup(id); >>> + if (portid != nullptr) { >>> + delete portid; >>> + portid_map.erase(TipcPortId::GetUniqueId(id)); >>> + m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]", >>> id.node, id.ref); >>> + } >>> + >>> + portid_map_mutex.unlock(); >>> + >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, >>> + struct tipc_portid id) { >>> + HeaderMessage header; >>> + header.Decode(buffer); >>> + // if mds support flow control >>> + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { >>> + if (header.pro_id_ == MDS_PROT_FCTRL_ID) { >>> + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) { >>> + // receive single ack message >>> + ChunkAck ack; >>> + ack.Decode(buffer); >>> + // send to the event thread >>> + if (m_NCS_IPC_SEND(&mbx_events, >>> + new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_, >>> + header.mseq_, header.mfrag_, ack.acked_fseq_, >>> ack.chunk_size_), >>> + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + return NCSCC_RC_SUCCESS; >>> + } >>> + } else { >>> + // receive data message >>> + DataMessage data; >>> + data.Decode(buffer); >>> + // send to the event thread >>> + if (m_NCS_IPC_SEND(&mbx_events, >>> + new Event(Event::Type::kEvtDropData, id, data.svc_id_, >>> + header.mseq_, header.mfrag_, header.fseq_), >>> + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); >>> + return NCSCC_RC_FAILURE; >>> + } >>> + return NCSCC_RC_SUCCESS; >>> + } >>> + } >>> + >>> + return NCSCC_RC_SUCCESS; >>> +} >>> + >>> +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, >>> + struct tipc_portid id) { >>> + HeaderMessage header; >>> + header.Decode(buffer); >>> + // if mds support flow control >>> + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { >>> + if (header.pro_id_ == MDS_PROT_FCTRL_ID) { >>> + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) { >>> + // receive single ack message >>> + ChunkAck ack; >>> + ack.Decode(buffer); >>> + // send to the event thread >>> + if (m_NCS_IPC_SEND(&mbx_events, >>> + new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_, >>> + header.mseq_, header.mfrag_, ack.acked_fseq_, >>> ack.chunk_size_), >>> + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { >>> + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); >>> + } >>> + // return NCSCC_RC_FAILURE, so the tipc receiving thread >>> (legacy) will >>> + // skip this data msg >>> + return NCSCC_RC_FAILURE; >>> + } >>> + } else { >>> + // receive data message >>> + DataMessage data; >>> + data.Decode(buffer); >>> + // todo: skip mcast/bcast, revisit >>> + if ((data.snd_type_ == MDS_SENDTYPE_BCAST || >>> + data.snd_type_ == MDS_SENDTYPE_RBCAST) && >>> is_mcast_enabled) { >>> + return NCSCC_RC_SUCCESS; >>> + } >>> + portid_map_mutex.lock(); >>> + uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData, >>> + id, data.svc_id_, header.mseq_, header.mfrag_, >>> header.fseq_)); >>> + portid_map_mutex.unlock(); >>> + return rc; >>> + } >>> + } >>> + return NCSCC_RC_SUCCESS; >>> +} >>> diff --git a/src/mds/mds_tipc_fctrl_intf.h >>> b/src/mds/mds_tipc_fctrl_intf.h >>> new file mode 100644 >>> index 0000000..85a058f >>> --- /dev/null >>> +++ b/src/mds/mds_tipc_fctrl_intf.h >>> @@ -0,0 +1,47 @@ >>> +/* -*- OpenSAF -*- >>> + * >>> + * (C) Copyright 2019 The OpenSAF Foundation >>> + * >>> + * This program is distributed in the hope that it will be useful, but >>> + * WITHOUT ANY WARRANTY; without even the implied warranty of >>> MERCHANTABILITY >>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >>> licensed >>> + * under the GNU Lesser General Public License Version 2.1, >>> February 1999. >>> + * The complete license can be accessed from the following location: >>> + * >>> https://protect2.fireeye.com/url?k=8c56609d-d0dfcff6-8c562006-0cc47ad93ea2-edf93c78f64bf311&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >>> + * See the Copying file included with the OpenSAF distribution for >>> full >>> + * licensing terms. >>> + * >>> + * Author(s): Ericsson AB >>> + * >>> + */ >>> + >>> +#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_ >>> +#define MDS_MDS_TIPC_FCTRL_INTF_H_ >>> + >>> +#include <linux/tipc.h> >>> +#include <stdbool.h> >>> +#include <stdint.h> >>> + >>> +#ifdef __cplusplus >>> +extern "C" { >>> +#endif >>> + >>> +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct >>> tipc_portid id, >>> + uint64_t rcv_buf_size, bool mbrcast_enabled); >>> +uint32_t mds_tipc_fctrl_shutdown(void); >>> +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, >>> + struct tipc_portid id); >>> +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t >>> type); >>> +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t >>> type); >>> +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id); >>> +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, >>> + struct tipc_portid id); >>> +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, >>> uint16_t len, >>> + uint16_t* next_seq); >>> +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, >>> + struct tipc_portid id); >>> +#ifdef __cplusplus >>> +} >>> +#endif >>> + >>> +#endif // MDS_MDS_TIPC_FCTRL_INTF_H_ >>> diff --git a/src/mds/mds_tipc_fctrl_msg.cc >>> b/src/mds/mds_tipc_fctrl_msg.cc >>> new file mode 100644 >>> index 0000000..abd38d3 >>> --- /dev/null >>> +++ b/src/mds/mds_tipc_fctrl_msg.cc >>> @@ -0,0 +1,142 @@ >>> +/* -*- OpenSAF -*- >>> + * >>> + * (C) Copyright 2019 The OpenSAF Foundation >>> + * >>> + * This program is distributed in the hope that it will be useful, but >>> + * WITHOUT ANY WARRANTY; without even the implied warranty of >>> MERCHANTABILITY >>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >>> licensed >>> + * under the GNU Lesser General Public License Version 2.1, >>> February 1999. >>> + * The complete license can be accessed from the following location: >>> + * >>> https://protect2.fireeye.com/url?k=6b1e74f3-3797db98-6b1e3468-0cc47ad93ea2-3e7c86e3dfcd12a1&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >>> + * See the Copying file included with the OpenSAF distribution for >>> full >>> + * licensing terms. >>> + * >>> + * Author(s): Ericsson AB >>> + * >>> + */ >>> + >>> +#include "mds/mds_tipc_fctrl_msg.h" >>> +#include "base/ncssysf_def.h" >>> + >>> +namespace mds { >>> +HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq, >>> + uint16_t mfrag, uint16_t fseq): msg_len_(msg_len), >>> + mseq_(mseq), mfrag_(mfrag), fseq_(fseq) { >>> + pro_ver_ = MDS_PROT_FCTRL; >>> + pro_id_ = 0; >>> + msg_type_ = 0; >>> +} >>> + >>> +void HeaderMessage::Encode(uint8_t *msg) { >>> + uint8_t *ptr; >>> + >>> + // encode message length >>> + ptr = &msg[0]; >>> + ncs_encode_16bit(&ptr, msg_len_); >>> + // encode sequence number >>> + ptr = &msg[2]; >>> + ncs_encode_32bit(&ptr, mseq_); >>> + // encode sequence number >>> + ptr = &msg[6]; >>> + ncs_encode_16bit(&ptr, mfrag_); >>> + // skip length_check: oct8&9 >>> + // encode protocol version >>> + ptr = &msg[10]; >>> + ncs_encode_8bit(&ptr, MDS_PROT_FCTRL); >>> +} >>> + >>> +void HeaderMessage::Decode(uint8_t *msg) { >>> + uint8_t *ptr; >>> + >>> + // decode message length >>> + ptr = &msg[0]; >>> + msg_len_ = ncs_decode_16bit(&ptr); >>> + // decode sequence number >>> + ptr = &msg[2]; >>> + mseq_ = ncs_decode_32bit(&ptr); >>> + // decode fragment number >>> + ptr = &msg[6]; >>> + mfrag_ = ncs_decode_16bit(&ptr); >>> + // decode protocol version >>> + ptr = &msg[10]; >>> + pro_ver_ = ncs_decode_8bit(&ptr); >>> + if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { >>> + // decode flow control sequence number >>> + ptr = &msg[8]; >>> + fseq_ = ncs_decode_16bit(&ptr); >>> + // decode protocol identifier >>> + ptr = &msg[11]; >>> + pro_id_ = ncs_decode_32bit(&ptr); >>> + if (pro_id_ == MDS_PROT_FCTRL_ID) { >>> + // decode message type >>> + ptr = &msg[15]; >>> + msg_type_ = ncs_decode_8bit(&ptr); >>> + } >>> + } else { >>> + if (mfrag_ != 0) { >>> + ptr = &msg[8]; >>> + fseq_ = ncs_decode_16bit(&ptr); >>> + if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL; >>> + } >>> + } >>> +} >>> + >>> +void DataMessage::Decode(uint8_t *msg) { >>> + uint8_t *ptr; >>> + >>> + // decode service id >>> + ptr = &msg[MDTM_PKT_TYPE_OFFSET + >>> + MDTM_FRAG_HDR_LEN + >>> + MDS_HEADER_RCVR_SVC_ID_POSITION]; >>> + svc_id_ = ncs_decode_16bit(&ptr); >>> + // decode snd_type >>> + ptr = &msg[17]; >>> + snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f; >>> +} >>> + >>> +DataMessage::~DataMessage() { >>> + if (msg_data_ != nullptr) { >>> + delete msg_data_; >>> + msg_data_ = nullptr; >>> + } >>> +} >>> + >>> +ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t >>> chunk_size): >>> + svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) { >>> + msg_type_ = kChunkAckMsgType; >>> +} >>> + >>> +void ChunkAck::Encode(uint8_t *msg) { >>> + uint8_t *ptr; >>> + // encode protocol identifier >>> + ptr = &msg[11]; >>> + ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID); >>> + // encode message type >>> + ptr = &msg[15]; >>> + ncs_encode_8bit(&ptr, kChunkAckMsgType); >>> + // encode service id >>> + ptr = &msg[16]; >>> + ncs_encode_16bit(&ptr, svc_id_); >>> + // encode flow control sequence number >>> + ptr = &msg[18]; >>> + ncs_encode_16bit(&ptr, acked_fseq_); >>> + // encode chunk size >>> + ptr = &msg[20]; >>> + ncs_encode_16bit(&ptr, chunk_size_); >>> +} >>> + >>> +void ChunkAck::Decode(uint8_t *msg) { >>> + uint8_t *ptr; >>> + >>> + // decode service id >>> + ptr = &msg[16]; >>> + svc_id_ = ncs_decode_16bit(&ptr); >>> + // decode flow control sequence number >>> + ptr = &msg[18]; >>> + acked_fseq_ = ncs_decode_16bit(&ptr); >>> + // decode chunk size >>> + ptr = &msg[20]; >>> + chunk_size_ = ncs_decode_16bit(&ptr); >>> +} >>> + >>> +} // end namespace mds >>> diff --git a/src/mds/mds_tipc_fctrl_msg.h >>> b/src/mds/mds_tipc_fctrl_msg.h >>> new file mode 100644 >>> index 0000000..677f256 >>> --- /dev/null >>> +++ b/src/mds/mds_tipc_fctrl_msg.h >>> @@ -0,0 +1,129 @@ >>> +/* -*- OpenSAF -*- >>> + * >>> + * (C) Copyright 2019 The OpenSAF Foundation >>> + * >>> + * This program is distributed in the hope that it will be useful, but >>> + * WITHOUT ANY WARRANTY; without even the implied warranty of >>> MERCHANTABILITY >>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >>> licensed >>> + * under the GNU Lesser General Public License Version 2.1, >>> February 1999. >>> + * The complete license can be accessed from the following location: >>> + * >>> https://protect2.fireeye.com/url?k=4fa312f3-132abd98-4fa35268-0cc47ad93ea2-ca1523d965a4960f&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >>> + * See the Copying file included with the OpenSAF distribution for >>> full >>> + * licensing terms. >>> + * >>> + * Author(s): Ericsson AB >>> + * >>> + */ >>> + >>> +#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_ >>> +#define MDS_MDS_TIPC_FCTRL_MSG_H_ >>> + >>> +#include <linux/tipc.h> >>> +#include "base/ncssysf_ipc.h" >>> +#include "base/ncssysf_tmr.h" >>> +#include "mds/mds_dt.h" >>> + >>> +namespace mds { >>> + >>> +class Event { >>> + public: >>> + enum class Type { >>> + kEvtPortIdAll = 0, >>> + kEvtPortIdUp, // event of new portid published (not used) >>> + kEvtPortIdDown, // event of portid widthdrawn (not used) >>> + >>> + kEvtDataFlowAll, >>> + kEvtRcvData, // event that received data msg from peer >>> + kEvtSendChunkAck, // event to send the ack for a chunk of N >>> accumulative >>> + // data msgs (N>=1) >>> + kEvtRcvChunkAck, // event that received the ack for a chunk of N >>> + // accumulative data msgs (N>=1) >>> + kEvtSendSelectiveAck, // event to send the ack for a number of >>> selective >>> + // data msgs (not supported) >>> + kEvtRcvSelectiveAck, // event that received the ack for a >>> numer of >>> + // selective data msgs (not supported) >>> + kEvtDropData, // event reported from tipc that a >>> message is not >>> + // delivered >>> + }; >>> + NCS_IPC_MSG next_{0}; >>> + Type type_; >>> + >>> + // Used for flow event only >>> + struct tipc_portid id_; >>> + uint16_t svc_id_{0}; >>> + uint32_t mseq_{0}; >>> + uint16_t mfrag_{0}; >>> + uint16_t fseq_{0}; >>> + uint16_t chunk_size_{1}; >>> + explicit Event(Type type):type_(type) {} >>> + Event(Type type, struct tipc_portid id, uint16_t svc_id, >>> + uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num): >>> + id_(id), svc_id_(svc_id), >>> + mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) { >>> + type_ = type; >>> + } >>> + Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t >>> mseq, >>> + uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size): >>> + id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag), >>> + fseq_(f_seg_num), chunk_size_(chunk_size) { >>> + type_ = type; >>> + } >>> +}; >>> + >>> +class BaseMessage { >>> + public: >>> + virtual ~BaseMessage() {} >>> + virtual void Decode(uint8_t *msg) {} >>> + virtual void Encode(uint8_t *msg) {} >>> +}; >>> + >>> +class HeaderMessage: public BaseMessage { >>> + public: >>> + uint8_t* msg_ptr_{nullptr}; >>> + uint16_t msg_len_{0}; >>> + uint32_t mseq_{0}; >>> + uint16_t mfrag_{0}; >>> + uint16_t fseq_{0}; >>> + uint8_t pro_ver_{0}; >>> + uint32_t pro_id_{0}; >>> + uint16_t msg_type_{0}; >>> + HeaderMessage() {} >>> + HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag, >>> + uint16_t fseq); >>> + virtual ~HeaderMessage() {} >>> + void Decode(uint8_t *msg) override; >>> + void Encode(uint8_t *msg) override; >>> +}; >>> + >>> +class DataMessage: public BaseMessage { >>> + public: >>> + HeaderMessage header_; >>> + uint16_t svc_id_{0}; >>> + >>> + uint8_t* msg_data_{nullptr}; >>> + uint8_t snd_type_{0}; >>> + >>> + DataMessage() {} >>> + virtual ~DataMessage(); >>> + void Decode(uint8_t *msg) override; >>> +}; >>> + >>> +class ChunkAck: public BaseMessage { >>> + public: >>> + static const uint8_t kChunkAckMsgType = 1; >>> + static const uint16_t kChunkAckMsgLength = 22; >>> + >>> + uint8_t msg_type_{0}; >>> + uint16_t svc_id_{0}; >>> + uint16_t acked_fseq_{0}; >>> + uint16_t chunk_size_{1}; >>> + ChunkAck() {} >>> + ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size); >>> + virtual ~ChunkAck() {} >>> + void Encode(uint8_t *msg) override; >>> + void Decode(uint8_t *msg) override; >>> +}; >>> + >>> +} // end namespace mds >>> + >>> +#endif // MDS_MDS_TIPC_FCTRL_MSG_H_ >>> diff --git a/src/mds/mds_tipc_fctrl_portid.cc >>> b/src/mds/mds_tipc_fctrl_portid.cc >>> new file mode 100644 >>> index 0000000..24d13ee >>> --- /dev/null >>> +++ b/src/mds/mds_tipc_fctrl_portid.cc >>> @@ -0,0 +1,261 @@ >>> +/* -*- OpenSAF -*- >>> + * >>> + * (C) Copyright 2019 The OpenSAF Foundation >>> + * >>> + * This program is distributed in the hope that it will be useful, but >>> + * WITHOUT ANY WARRANTY; without even the implied warranty of >>> MERCHANTABILITY >>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >>> licensed >>> + * under the GNU Lesser General Public License Version 2.1, >>> February 1999. >>> + * The complete license can be accessed from the following location: >>> + * >>> https://protect2.fireeye.com/url?k=0f27255d-53ae8a36-0f2765c6-0cc47ad93ea2-763cb574242dc0e2&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >>> + * See the Copying file included with the OpenSAF distribution for >>> full >>> + * licensing terms. >>> + * >>> + * Author(s): Ericsson AB >>> + * >>> + */ >>> + >>> +#include "mds/mds_tipc_fctrl_portid.h" >>> +#include "base/ncssysf_def.h" >>> + >>> +#include "mds/mds_dt.h" >>> +#include "mds/mds_log.h" >>> + >>> +namespace mds { >>> + >>> +void MessageQueue::Queue(DataMessage* msg) { >>> + queue_.push_back(msg); >>> +} >>> + >>> +DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) { >>> + for (auto it = queue_.begin(); it != queue_.end(); ++it) { >>> + DataMessage *m = *it; >>> + if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) { >>> + return m; >>> + } >>> + } >>> + return nullptr; >>> +} >>> + >>> +uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { >>> + uint64_t msg_len = 0; >>> + for (auto it = queue_.begin(); it != queue_.end();) { >>> + DataMessage *m = *it; >>> + if (fseq_from <= m->header_.fseq_ && >>> + m->header_.fseq_ <= fseq_to) { >>> + msg_len += m->header_.msg_len_; >>> + it = queue_.erase(it); >>> + delete m; >>> + } else { >>> + it++; >>> + } >>> + } >>> + return msg_len; >>> +} >>> + >>> +void MessageQueue::Clear() { >>> + while (queue_.empty() == false) { >>> + DataMessage* msg = queue_.front(); >>> + queue_.pop_front(); >>> + delete msg; >>> + } >>> +} >>> + >>> +TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t >>> chksize, >>> + uint64_t sock_buf_size): >>> + id_(id), bsrsock_(sock), chunk_size_(chksize), >>> rcv_buf_size_(sock_buf_size) { >>> +} >>> + >>> +TipcPortId::~TipcPortId() { >>> + // clear all msg in sndqueue_ >>> + sndqueue_.Clear(); >>> +} >>> + >>> +uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) { >>> + // this uid is equivalent to the mds adest >>> + uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref; >>> + return uid; >>> +} >>> + >>> +uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { >>> + struct sockaddr_tipc server_addr; >>> + ssize_t send_len = 0; >>> + uint32_t rc = NCSCC_RC_SUCCESS; >>> + >>> + memset(&server_addr, 0, sizeof(server_addr)); >>> + server_addr.family = AF_TIPC; >>> + server_addr.addrtype = TIPC_ADDR_ID; >>> + server_addr.addr.id = id_; >>> + send_len = sendto(bsrsock_, data, length, 0, >>> + (struct sockaddr *)&server_addr, sizeof(server_addr)); >>> + >>> + if (send_len == length) { >>> + rc = NCSCC_RC_SUCCESS; >>> + } else { >>> + m_MDS_LOG_ERR("sendto() err :%s", strerror(errno)); >>> + rc = NCSCC_RC_FAILURE; >>> + } >>> + return rc; >>> +} >>> + >>> +uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) { >>> + uint32_t rc = NCSCC_RC_SUCCESS; >>> + >>> + DataMessage *msg = new DataMessage; >>> + msg->header_.Decode(const_cast<uint8_t*>(data)); >>> + msg->Decode(const_cast<uint8_t*>(data)); >>> + msg->msg_data_ = new uint8_t[length]; >>> + memcpy(msg->msg_data_, data, length); >>> + sndqueue_.Queue(msg); >>> + ++sndwnd_.send_; >>> + sndwnd_.nacked_space_ += length; >>> + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " >>> + "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " >>> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", >>> + id_.node, id_.ref, >>> + msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, >>> length, >>> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); >>> + >>> + return rc; >>> +} >>> + >>> +bool TipcPortId::ReceiveCapable(uint16_t sending_len) { >>> + return true; >>> +} >>> + >>> +void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id, >>> + uint16_t chksize) { >>> + uint8_t data[ChunkAck::kChunkAckMsgLength]; >>> + >>> + HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq); >>> + header.Encode(reinterpret_cast<uint8_t*>(&data)); >>> + >>> + ChunkAck sack(svc_id, fseq, chksize); >>> + sack.Encode(reinterpret_cast<uint8_t*>(&data)); >>> + Send(reinterpret_cast<uint8_t*>(&data), >>> ChunkAck::kChunkAckMsgLength); >>> + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " >>> + "SndChkAck[fseq:%u, chunk:%u]", >>> + id_.node, id_.ref, >>> + fseq, chksize); >>> +} >>> + >>> +uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, >>> + uint16_t fseq, uint16_t svc_id) { >>> + uint32_t rc = NCSCC_RC_SUCCESS; >>> + // update receiver sequence window >>> + if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { >>> + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " >>> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " >>> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]", >>> + id_.node, id_.ref, >>> + mseq, mfrag, fseq, >>> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); >>> + >>> + ++rcvwnd_.rcv_; >>> + if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) { >>> + // send ack for @chunk_size_ msgs starting from fseq >>> + SendChunkAck(fseq, svc_id, chunk_size_); >>> + rcvwnd_.acked_ = rcvwnd_.rcv_; >>> + } >>> + } else { >>> + // todo: update rcvwnd_.nacked_space_. >>> + // This nacked_space_ will tell the number of bytes that has >>> not been acked >>> + // to the sender. If this nacked_space_ is growing large, and >>> approaching >>> + // the socket buffer size, the transmission of sender may be >>> queued. >>> + // this nacked_space_ can be used to detect if the >>> kChunkAckTimeout or >>> + // kChunkAckSize are set excessively large. >>> + // It is not used for now, so ignore it. >>> + >>> + // check for transmission error >>> + if (rcvwnd_.rcv_ + 1 < fseq) { >>> + if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { >>> + // peer does not realize that this portid reset >>> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " >>> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " >>> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " >>> + "Warning[portid reset]", >>> + id_.node, id_.ref, >>> + mseq, mfrag, fseq, >>> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); >>> + >>> + rcvwnd_.rcv_ = fseq; >>> + } else { >>> + rc = NCSCC_RC_FAILURE; >>> + // msg loss >>> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " >>> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " >>> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " >>> + "Error[msg loss]", >>> + id_.node, id_.ref, >>> + mseq, mfrag, fseq, >>> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); >>> + } >>> + } >>> + if (fseq <= rcvwnd_.acked_) { >>> + rc = NCSCC_RC_FAILURE; >>> + // unexpected retransmission >>> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " >>> + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " >>> + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " >>> + "Error[unexpected retransmission]", >>> + id_.node, id_.ref, >>> + mseq, mfrag, fseq, >>> + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); >>> + } >>> + } >>> + return rc; >>> +} >>> + >>> +void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { >>> + // update sender sequence window >>> + if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) { >>> + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " >>> + "RcvChkAck[fseq:%u, chunk:%u], " >>> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " >>> + "queue[size:%" PRIu64 "]", >>> + id_.node, id_.ref, >>> + fseq, chksize, >>> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, >>> + sndqueue_.Size()); >>> + >>> + // fast forward the sndwnd_.acked_ sequence to fseq >>> + sndwnd_.acked_ = fseq; >>> + >>> + // remove a number @chksize messages out of sndqueue_ and decrease >>> + // the nacked_space_ of sender >>> + sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, >>> fseq); >>> + } else { >>> + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " >>> + "RcvChkAck[fseq:%u, chunk:%u], " >>> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " >>> + "queue[size:%" PRIu64 "], " >>> + "Error[msg disordered]", >>> + id_.node, id_.ref, >>> + fseq, chksize, >>> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, >>> + sndqueue_.Size()); >>> + } >>> +} >>> + >>> +void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, >>> + uint16_t fseq) { >>> + DataMessage* msg = sndqueue_.Find(mseq, mfrag); >>> + if (msg != nullptr) { >>> + // Resend the msg found >>> + Send(msg->msg_data_, msg->header_.msg_len_); >>> + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " >>> + "RsndData[mseq:%u, mfrag:%u, fseq:%u], " >>> + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", >>> + id_.node, id_.ref, >>> + mseq, mfrag, fseq, >>> + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); >>> + } else { >>> + m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], " >>> + "RsndData[mseq:%u, mfrag:%u, fseq:%u], " >>> + "Error[msg not found]", >>> + id_.node, id_.ref, >>> + mseq, mfrag, fseq); >>> + } >>> +} >>> + >>> +} // end namespace mds >>> diff --git a/src/mds/mds_tipc_fctrl_portid.h >>> b/src/mds/mds_tipc_fctrl_portid.h >>> new file mode 100644 >>> index 0000000..8068e6e >>> --- /dev/null >>> +++ b/src/mds/mds_tipc_fctrl_portid.h >>> @@ -0,0 +1,87 @@ >>> +/* -*- OpenSAF -*- >>> + * >>> + * (C) Copyright 2019 The OpenSAF Foundation >>> + * >>> + * This program is distributed in the hope that it will be useful, but >>> + * WITHOUT ANY WARRANTY; without even the implied warranty of >>> MERCHANTABILITY >>> + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >>> licensed >>> + * under the GNU Lesser General Public License Version 2.1, >>> February 1999. >>> + * The complete license can be accessed from the following location: >>> + * >>> https://protect2.fireeye.com/url?k=226846f8-7ee1e993-22680663-0cc47ad93ea2-e9b41ced4c57c419&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >>> + * See the Copying file included with the OpenSAF distribution for >>> full >>> + * licensing terms. >>> + * >>> + * Author(s): Ericsson AB >>> + * >>> + */ >>> + >>> +#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_ >>> +#define MDS_MDS_TIPC_FCTRL_PORTID_H_ >>> + >>> +#include <linux/tipc.h> >>> +#include <stdbool.h> >>> +#include <stdint.h> >>> +#include <stdio.h> >>> +#include <unistd.h> >>> +#include <deque> >>> +#include "mds/mds_tipc_fctrl_msg.h" >>> + >>> +namespace mds { >>> + >>> +class MessageQueue { >>> + public: >>> + void Queue(DataMessage* msg); >>> + DataMessage* Find(uint32_t mseq, uint16_t mfrag); >>> + uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to); >>> + uint64_t Size() const { return queue_.size(); } >>> + void Clear(); >>> + private: >>> + std::deque<DataMessage*> queue_; >>> +}; >>> + >>> +class TipcPortId { >>> + public: >>> + TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size, >>> + uint64_t sock_buf_size); >>> + ~TipcPortId(); >>> + static uint64_t GetUniqueId(struct tipc_portid id); >>> + int GetSock() const { return bsrsock_; } >>> + uint16_t GetCurrentSeq() { return sndwnd_.send_; } >>> + bool ReceiveCapable(uint16_t sending_len); >>> + void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size); >>> + void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t >>> chunk_size); >>> + uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, >>> + uint16_t fseq, uint16_t svc_id); >>> + void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); >>> + uint32_t Send(uint8_t* data, uint16_t length); >>> + uint32_t Queue(const uint8_t* data, uint16_t length); >>> + >>> + uint16_t svc_cnt_{1}; // number of service subscribed on this >>> portid >>> + >>> + private: >>> + struct tipc_portid id_; >>> + int bsrsock_; // tipc socket to send/receive data per tipc_portid >>> + uint16_t chunk_size_{5}; >>> + uint64_t rcv_buf_size_{0}; // estimated buffer size at receiver >>> + >>> + struct sndwnd { >>> + // sender sequence window >>> + uint16_t acked_{0}; // last sequence has been acked by receiver >>> + uint16_t send_{1}; // next sequence to be sent >>> + uint64_t nacked_space_{0}; // total bytes are sent but not acked >>> + }; >>> + struct sndwnd sndwnd_; >>> + >>> + struct rcvwnd { >>> + // receiver sequence window >>> + uint16_t acked_{0}; // last sequence has been acked to sender >>> + uint16_t rcv_{0}; // last sequence has been received >>> + uint64_t nacked_space_{0}; // total bytes has not been acked >>> + }; >>> + struct rcvwnd rcvwnd_; >>> + >>> + MessageQueue sndqueue_; >>> +}; >>> + >>> +} // end namespace mds >>> +#endif // MDS_MDS_TIPC_FCTRL_PORTID_H_ _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel