Hi Minh, it is a large patch so i have to review parts of it, below are my comments, marked with [HansN], for files:
src/mds/Makefile.am src/mds/mds_dt.h src/mds/mds_dt_tipc.c I'll continue with the rest of the files a bit later. /Thanks Hans On 2019-08-14 08:38, Minh Chau wrote: > This is a collaborative patch of two participants:Thuan, Minh. > > Main changes: > - Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files > introduce new functions which are called in mds_dt_tipc.c if the flow > control is enabled > - Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files > implements the tipc portid instance, which supports the sliding window, > mds msg queue > - Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define > the event and messages which are used for this solution. > --- > src/mds/Makefile.am | 10 +- > src/mds/mds_dt.h | 8 +- > src/mds/mds_dt_tipc.c | 188 +++++++++++++------- > src/mds/mds_tipc_fctrl_intf.cc | 376 > +++++++++++++++++++++++++++++++++++++++ > src/mds/mds_tipc_fctrl_intf.h | 47 +++++ > src/mds/mds_tipc_fctrl_msg.cc | 142 +++++++++++++++ > src/mds/mds_tipc_fctrl_msg.h | 129 ++++++++++++++ > src/mds/mds_tipc_fctrl_portid.cc | 261 +++++++++++++++++++++++++++ > src/mds/mds_tipc_fctrl_portid.h | 87 +++++++++ > 9 files changed, 1184 insertions(+), 64 deletions(-) > create mode 100644 src/mds/mds_tipc_fctrl_intf.cc > create mode 100644 src/mds/mds_tipc_fctrl_intf.h > create mode 100644 src/mds/mds_tipc_fctrl_msg.cc > create mode 100644 src/mds/mds_tipc_fctrl_msg.h > create mode 100644 src/mds/mds_tipc_fctrl_portid.cc > create mode 100644 src/mds/mds_tipc_fctrl_portid.h > > diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am > index 2d7b652..d849e8f 100644 > --- a/src/mds/Makefile.am > +++ b/src/mds/Makefile.am > @@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \ > if ENABLE_TIPC_TRANSPORT > noinst_HEADERS += src/mds/mds_dt_tipc.h \ > src/mds/mds_tipc_recvq_stats.h \ > - src/mds/mds_tipc_recvq_stats_impl.h > + src/mds/mds_tipc_recvq_stats_impl.h \ > + src/mds/mds_tipc_fctrl_intf.h \ > + src/mds/mds_tipc_fctrl_portid.h \ > + src/mds/mds_tipc_fctrl_msg.h > lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \ > src/mds/mds_tipc_recvq_stats.cc \ > - src/mds/mds_tipc_recvq_stats_impl.cc > + src/mds/mds_tipc_recvq_stats_impl.cc \ > + src/mds/mds_tipc_fctrl_intf.cc \ > + src/mds/mds_tipc_fctrl_portid.cc \ > + src/mds/mds_tipc_fctrl_msg.cc > endif > > if ENABLE_TESTS > diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h > index b645bb4..d9e8633 100644 > --- a/src/mds/mds_dt.h > +++ b/src/mds/mds_dt.h > @@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL ref); > uint32_t mds_tmr_mailbox_processing(void); > uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL *svc_hdl); > uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num, > - uint16_t frag_byte); > + uint16_t frag_byte, uint16_t fctrl_seq_num); > uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg); > uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, uint64_t > tipc_id, > uint32_t *buff_dump); > @@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, NCSCONTEXT > msg); > > #define MDS_PROT 0xA0 > #define MDS_VERSION 0x08 > -#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION) > +#define MDS_PROT_VER_MASK 0xFC > #define MDTM_PRI_MASK 0x3 > > +/* MDS protocol/version for flow control */ > +#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION) > +#define MDS_PROT_FCTRL_ID 0x00AC13F5 > + > /* Added for the subscription changes */ > #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff0000) > #define MDS_TIPC_COMMON_ID 0x01001000 > diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c > index 86b52bb..fef1c50 100644 > --- a/src/mds/mds_dt_tipc.c > +++ b/src/mds/mds_dt_tipc.c > @@ -47,6 +47,7 @@ > #include "mds_dt_tipc.h" > #include "mds_dt_tcp_disc.h" > #include "mds_core.h" > +#include "mds_tipc_fctrl_intf.h" > #include "mds_tipc_recvq_stats.h" > #include "base/osaf_utility.h" > #include "base/osaf_poll.h" > @@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list; > uint32_t mdtm_global_frag_num; > > const unsigned int MAX_RECV_THRESHOLD = 30; > +uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION; > > -static bool get_tipc_port_id(int sock, uint32_t* port_id) { > +static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) { > struct sockaddr_tipc addr; > socklen_t sz = sizeof(addr); > > memset(&addr, 0, sizeof(addr)); > - *port_id = 0; > + port_id->node = 0; > + port_id->ref = 0; > if (0 > getsockname(sock, (struct sockaddr *)&addr, &sz)) { > syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, err: %s", > strerror(errno)); > return false; > } > > - *port_id = addr.addr.id.ref; > + *port_id = addr.addr.id; > return true; > } > > @@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t > *mds_tipc_ref) > } > > /* Code for getting the self tipc random number */ > - if (!get_tipc_port_id(tipc_cb.BSRsock, mds_tipc_ref)) { > + struct tipc_portid port_id; [HansN] mds_tipc_ref was previously set to 0 in get_tipc_port_id , this is now missing. > + if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) { > close(tipc_cb.Dsock); > close(tipc_cb.BSRsock); > return NCSCC_RC_FAILURE; > } > - > + *mds_tipc_ref = port_id.ref; > tipc_cb.adest = ((uint64_t)(nodeid)) << 32; > tipc_cb.adest |= *mds_tipc_ref; > tipc_cb.node_id = nodeid; > @@ -325,6 +329,23 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t > *mds_tipc_ref) > mdtm_set_transport(MDTM_TX_TYPE_TIPC); > } > > + /* Get tipc socket receive buffer size */ [HansN] int optval = 0; > + int optval; > + socklen_t optlen = sizeof(optval); > + if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF, > + &optval, &optlen) != 0) { > + syslog(LOG_ERR, "MDTM: getsockopt() failed to get rcv buf size > to: %str", > + strerror(errno)); > + close(tipc_cb.Dsock); > + close(tipc_cb.BSRsock); > + return NCSCC_RC_FAILURE; > + } > + > + /* Create flow control tasks if enabled*/ > + gl_mds_pro_ver = MDS_PROT_FCTRL; [HansN] a question, optval is int (receive buffer size) and casted to uint64_t (the cast is not needed) in mds_tipc_fctrl_initialize, why use different types, int and uint64_t? > + mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, > + (uint64_t)optval, tipc_mcast_enabled); > + > /* Create a task to receive the events and data */ > if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != NCSCC_RC_SUCCESS) { > syslog(LOG_ERR, > @@ -469,7 +490,7 @@ uint32_t mdtm_tipc_destroy(void) > NULL); > m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx, > (NCS_IPC_CB)mdtm_mailbox_mbx_cleanup); > - > + mds_tipc_fctrl_shutdown(); > /* Clear reference hdl list */ > while (mdtm_ref_hdl_list_hdr != NULL) { > /* Store temporary the pointer of entry to be deleted */ > @@ -672,36 +693,26 @@ ssize_t recvfrom_connectionless(int sd, void *buf, > size_t nbytes, int flags, > 0)); > if (anc_data[0] == TIPC_ERR_OVERLOAD) { > LOG_ER( > - "MDTM: From <0x%"PRIx32 > ":%"PRIu32 "> undeliverable message condition ancillary data: > TIPC_ERR_OVERLOAD", > - ((struct sockaddr_tipc*) > from)->addr.id.node, > - ((struct sockaddr_tipc*) > from)->addr.id.ref); > - m_MDS_LOG_CRITICAL( > "MDTM: From <0x%"PRIx32 > ":%"PRIu32 "> undeliverable message condition ancillary data: > TIPC_ERR_OVERLOAD ", > ((struct sockaddr_tipc*) > from)->addr.id.node, > ((struct sockaddr_tipc*) > from)->addr.id.ref); > - } else { > + } else if (anc_data[0] == > TIPC_ERR_NO_PORT){ > /* TIPC_ERRINFO - TIPC error > * code associated with a > * returned data message or a > * connection termination > * message */ > - m_MDS_LOG_DBG( > - "MDTM: undelivered message > condition ancillary data: TIPC_ERRINFO err : %d", > - anc_data[0]); > + > mds_tipc_fctrl_portid_terminate(((struct sockaddr_tipc*)from)->addr.id); > + } else { > + m_MDS_LOG_ERR("MDTM:: > TIPC_ERRINFO anc_data[0]:%u", anc_data[0]); > } > } else if (anc->cmsg_type == TIPC_RETDATA) { > - /* If we set TIPC_DEST_DROPPABLE off > - message (configure TIPC to return > - rejected messages to the sender ) we > - will hit this when we implement MDS > - retransmit lost messages, can be > - replaced with flow control logic */ > /* TIPC_RETDATA -The contents of a > * returned data message */ > - LOG_IN( > - "MDTM: undelivered message > condition ancillary data: TIPC_RETDATA"); > - m_MDS_LOG_INFO( > - "MDTM: undelivered message > condition ancillary data: TIPC_RETDATA"); > + LOG_IN("MDTM: undelivered message > condition ancillary data: TIPC_RETDATA"); > + uint16_t ret_msg_len = anc->cmsg_len - > sizeof(*anc); > + unsigned char *ret_msg = CMSG_DATA(anc); > + mds_tipc_fctrl_drop_data(ret_msg, > ret_msg_len, ((struct sockaddr_tipc*)from)->addr.id); > } else if (anc->cmsg_type == TIPC_DESTNAME) { > if (sz == 0) { > m_MDS_LOG_DBG( > @@ -822,7 +833,7 @@ static uint32_t mdtm_process_recv_events(void) > m_MDS_LOG_INFO( > "MDTM: > Published: "); > m_MDS_LOG_INFO( > - "MDTM: > <%u,%u,%u> port id <0x%08x:%u>\n", > + "MDTM: > <%x,%x,%x> port id <0x%08x:%u>\n", > NTOHL( > event.s > .seq > @@ -841,7 +852,12 @@ static uint32_t mdtm_process_recv_events(void) > event > > .port > > .ref)); > - > + struct > tipc_portid id = { > + .node = > NTOHL(event.port.node), > + .ref = > NTOHL(event.port.ref) > + }; > + uint32_t type = > NTOHL(event.s.seq.type); > + > mds_tipc_fctrl_portid_up(id, type); > if > (NCSCC_RC_SUCCESS != > > mdtm_process_discovery_events( > > TIPC_PUBLISHED, > @@ -873,7 +889,12 @@ static uint32_t mdtm_process_recv_events(void) > event > > .port > > .ref)); > - > + struct > tipc_portid id = { > + .node = > NTOHL(event.port.node), > + .ref = > NTOHL(event.port.ref) > + }; > + uint32_t type = > NTOHL(event.s.seq.type); > + > mds_tipc_fctrl_portid_down(id, type); > if > (NCSCC_RC_SUCCESS != > > mdtm_process_discovery_events( > > TIPC_WITHDRAWN, > @@ -986,8 +1007,6 @@ static uint32_t mdtm_process_recv_events(void) > break; > } > if (recd_bytes == 0) { > - m_MDS_LOG_DBG( > - "MDTM: recd bytes=0 on > received on sock, abnormal/unknown/hack condition. Ignoring"); > break; > } > data = inbuf; > @@ -1076,9 +1095,10 @@ static uint32_t mdtm_process_recv_events(void) > &buff_dump); [HansN] flow control seems not supported if MDS_CHECKSUM_ENABLE_FLAG is set? Either we document that flow control is not supported if MDS_CHECKSUM_ENABLE_FLAG is set or we support it? > } > #else > - mdtm_process_recv_data( > - &inbuf[2], recd_bytes - 2, > - tipc_id, &buff_dump); > + if > (mds_tipc_fctrl_rcv_data(inbuf, recd_bytes, client_addr.addr.id) > + == NCSCC_RC_SUCCESS) { > + > mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, &buff_dump); > + } > #endif > } else { > uint64_t tipc_id; > @@ -1873,9 +1893,9 @@ uint32_t mds_mdtm_svc_install_tipc(PW_ENV_ID pwe_id, > MDS_SVC_ID svc_id, > server_addr.addr.nameseq.upper = server_inst; > > /* The self tipc random port number */ > - uint32_t port_id = 0; > + struct tipc_portid port_id; > get_tipc_port_id(tipc_cb.BSRsock, &port_id); > - m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id, > + m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id.ref, > server_type, server_inst); > > if (0 != bind(tipc_cb.BSRsock, (struct sockaddr *)&server_addr, > @@ -2495,6 +2515,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) > */ > uint32_t status = 0; > uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len; [HansN] indentation is not correct > + uint16_t fctrl_seq_num = 0; > int version = req->msg_arch_word & 0x7; > if (version > 1) { > sum_mds_hdr_plus_mdtm_hdr_plus_len = > @@ -2583,11 +2604,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) > mdtm_add_mds_hdr(buffer_ack, req)) { > return NCSCC_RC_FAILURE; > } > - > + /* if sndqueue is capable, then obtain the current sending > seq */ > + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, > &fctrl_seq_num) > + == NCSCC_RC_FAILURE){ > + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len); > + return NCSCC_RC_FAILURE; > + } > /* Add frag_hdr */ > if (NCSCC_RC_SUCCESS != > mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num, > - 0)) { > + 0, fctrl_seq_num)) { > return NCSCC_RC_FAILURE; > } > > @@ -2676,13 +2702,22 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) > free(body); > return NCSCC_RC_FAILURE; > } > + /* if sndqueue is capable, then obtain the > current sending seq */ > + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, > + len + > sum_mds_hdr_plus_mdtm_hdr_plus_len, > + &fctrl_seq_num) == NCSCC_RC_FAILURE){ > + m_MDS_LOG_ERR("FCTRL: Failed to send > message len :%d", > + len + > sum_mds_hdr_plus_mdtm_hdr_plus_len); > + free(body); > + return NCSCC_RC_FAILURE; > + } > > if (NCSCC_RC_SUCCESS != > mdtm_add_frag_hdr( > body, > (len + > sum_mds_hdr_plus_mdtm_hdr_plus_len), > - frag_seq_num, 0)) { > + frag_seq_num, 0, fctrl_seq_num)) { > m_MDS_LOG_ERR( > "MDTM: Unable to add the frag Hdr > to the send msg\n"); > m_MMGR_FREE_BUFR_LIST(usrbuf); > @@ -2778,12 +2813,23 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) > req->msg.data.buff_info.buff); > return NCSCC_RC_FAILURE; > } > + /* if sndqueue is capable, then obtain the current > sending seq */ > + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, > + req->msg.data.buff_info.len + > sum_mds_hdr_plus_mdtm_hdr_plus_len, > + &fctrl_seq_num) == NCSCC_RC_FAILURE) { > + m_MDS_LOG_ERR("FCTRL: Failed to send message > len :%d", > + req->msg.data.buff_info.len + > sum_mds_hdr_plus_mdtm_hdr_plus_len); > + free(body); > + > mds_free_direct_buff(req->msg.data.buff_info.buff); > + return NCSCC_RC_FAILURE; > + } > + > if (NCSCC_RC_SUCCESS != > mdtm_add_frag_hdr( > body, > req->msg.data.buff_info.len + > sum_mds_hdr_plus_mdtm_hdr_plus_len, > - frag_seq_num, 0)) { > + frag_seq_num, 0, fctrl_seq_num)) { > m_MDS_LOG_ERR( > "MDTM: Unable to add the frag Hdr to the > send msg\n"); > free(body); > @@ -2860,6 +2906,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, > uint32_t seq_num, > uint16_t i = 1; > uint16_t frag_val = 0; > uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len; > + uint16_t fctrl_seq_num = 0; > int version = req->msg_arch_word & 0x7; > uint32_t ret = NCSCC_RC_SUCCESS; > > @@ -2938,9 +2985,18 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, > uint32_t seq_num, > return NCSCC_RC_FAILURE; > } > } > + /* if sndqueue is capable, then obtain the current > sending seq */ > + if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, > &fctrl_seq_num) > + == NCSCC_RC_FAILURE) { > + m_MDS_LOG_ERR("FCTRL: Failed to send message > len :%d", len_buf); > + m_MMGR_FREE_BUFR_LIST(usrbuf); > + free(body); > + return NCSCC_RC_FAILURE; > + } > + > if (NCSCC_RC_SUCCESS != > mdtm_add_frag_hdr(body, len_buf, seq_num, > - frag_val)) { > + frag_val, fctrl_seq_num)) { > m_MDS_LOG_ERR( > "MDTM: Frag hde addition failed\n"); > m_MMGR_FREE_BUFR_LIST(usrbuf); > @@ -2996,7 +3052,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, > uint32_t seq_num, > *********************************************************/ > > uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num, > - uint16_t frag_byte) > + uint16_t frag_byte, uint16_t fctrl_seq_num) > { > /* Add the FRAG HDR to the Buffer */ > uint8_t *data; > @@ -3013,9 +3069,17 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t > len, uint32_t seq_num, > 5); /* 2 bytes for keeping len, to cross > check at the receiver end */ > #else > - ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - > - 2); /* 2 bytes for keeping len, to cross > - check at the receiver end */ > + /* Next 2 bytes for keeping len, to cross check at the receiver end. > + * Used to be encoded as below: > + * > + * ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - 2); > + * > + * However, these 2 bytes have been never examined at receiver. As > backward > + * compatibility, the fragment header and mds data header can't be > extended, > + * hereafter, these 2 bytes will be used as sequence number in flow > control > + * (per tipc portid) > + * */ > + ncs_encode_16bit(&data, fctrl_seq_num); > #endif > return NCSCC_RC_SUCCESS; > } > @@ -3060,21 +3124,25 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t > buff_len, > buffer[4] = checksum; > } > #endif > - > - send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0, > - (struct sockaddr *)&server_addr, sizeof(server_addr)); > - if (send_len == buff_len) { > - m_MDS_LOG_INFO("MDTM: Successfully sent message"); > - return NCSCC_RC_SUCCESS; > - } else if (send_len == -1) { > - m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", > - strerror(errno)); > - return NCSCC_RC_FAILURE; > - } else { > - m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd", > - send_len); > - return NCSCC_RC_FAILURE; [HansN] NCSCC_RC_SUCCESS is returned even if no message has been sent as the return code from mds_tipc_fctrl_trysend only checks for NCSCC_RC_SUCCESS, else stmt missing? > + if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == NCSCC_RC_SUCCESS) { > + send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0, > + (struct sockaddr *)&server_addr, > sizeof(server_addr)); > + if (send_len == buff_len) { > + m_MDS_LOG_INFO("MDTM: Successfully sent message"); > + return NCSCC_RC_SUCCESS; > + } else if (send_len == -1) { > + m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", > strerror(errno)); > + // todo: it's failed to send msg, if flow control is > enabled > + // the msg needs to mark unsent > + return NCSCC_RC_FAILURE; > + } else { > + m_MDS_LOG_ERR("MDTM: Failed to send message send_len > :%zd", send_len); > + // todo: it's failed to send msg, if flow control is > enabled > + // the msg needs to mark unsent > + return NCSCC_RC_FAILURE; > + } > } > + return NCSCC_RC_SUCCESS; > } > > /********************************************************* > @@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void *buffer, > size_t size, > server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE); > /*This can be scope-down to dest_svc_id server_inst TBD*/ > server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE); > - > int send_len = > sendto(tipc_cb.BSRsock, buffer, size, 0, > (struct sockaddr *)&server_addr, sizeof(server_addr)); > + > if (send_len == size) { > - m_MDS_LOG_INFO("MDTM: Successfully sent message"); > + m_MDS_LOG_INFO("MDTM: Successfully sent mcast message"); > return NCSCC_RC_SUCCESS; > } else { > m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err :%s", > @@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t *buffer, > MDTM_SEND_REQ *req) > uint8_t *ptr; > ptr = buffer; > > - prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1)); > + prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1)); > enc_snd_type = (req->msg.encoding << 6); > enc_snd_type |= (uint8_t)req->snd_type; > > diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc > new file mode 100644 > index 0000000..91b9107 > --- /dev/null > +++ b/src/mds/mds_tipc_fctrl_intf.cc > @@ -0,0 +1,376 @@ > +/* -*- OpenSAF -*- > + * > + * (C) Copyright 2019 The OpenSAF Foundation > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > +#include "mds/mds_tipc_fctrl_intf.h" > + > +#include <poll.h> > +#include <pthread.h> > +#include <stdio.h> > +#include <sys/poll.h> > +#include <unistd.h> > + > +#include <map> > +#include <mutex> > + > +#include "base/ncssysf_def.h" > +#include "base/ncssysf_tsk.h" > + > +#include "mds/mds_log.h" > +#include "mds/mds_tipc_fctrl_portid.h" > +#include "mds/mds_tipc_fctrl_msg.h" > + > +using mds::Event; > +using mds::TipcPortId; > +using mds::DataMessage; > +using mds::ChunkAck; > +using mds::HeaderMessage; > + > +namespace { > +// multicast/broadcast enabled > +// todo: to be removed if flow control support it > +bool is_mcast_enabled = true; > + > +// mailbox handles for events > +SYSF_MBX mbx_events; > + > +// ncs task handle > +NCSCONTEXT p_task_hdl = nullptr; > + > +// data socket associated with port id > +int data_sock_fd = 0; > +struct tipc_portid snd_rcv_portid; > + > +// socket receiver's buffer size > +// todo: This buffer size is read from the local node as an estimated > +// buffer size of the receiver sides, in facts that could be different > +// at the receiver's sides (in the other nodes). At this moment, we > +// assume all nodes in cluster have set the same tipc buffer size > +uint64_t sock_buf_size = 0; > + > +// map of key:unique id(adest), value: TipcPortId instance > +// unique id is derived from struct tipc_portid > +std::map<uint64_t, TipcPortId*> portid_map; > +std::mutex portid_map_mutex; > + > +// chunk ack parameters > +// todo: The chunk ack size should be configurable > +uint16_t kChunkAckSize = 3; > + > +TipcPortId* portid_lookup(struct tipc_portid id) { > + uint64_t uid = TipcPortId::GetUniqueId(id); > + if (portid_map.find(uid) == portid_map.end()) return nullptr; > + return portid_map[uid]; > +} > + > +uint32_t process_flow_event(const Event evt) { > + uint32_t rc = NCSCC_RC_SUCCESS; > + TipcPortId *portid = portid_lookup(evt.id_); > + if (portid == nullptr) { > + if (evt.type_ == Event::Type::kEvtRcvData) { > + portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize, > + sock_buf_size); > + portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; > + if (evt.type_ == Event::Type::kEvtRcvData) { > + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, > + evt.fseq_, evt.svc_id_); > + } > + } > + } else { > + if (evt.type_ == Event::Type::kEvtRcvData) { > + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, > + evt.fseq_, evt.svc_id_); > + } > + if (evt.type_ == Event::Type::kEvtRcvChunkAck) { > + portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_); > + } > + if (evt.type_ == Event::Type::kEvtSendChunkAck) { > + portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_); > + } > + if (evt.type_ == Event::Type::kEvtDropData) { > + portid->ReceiveNack(evt.mseq_, evt.mfrag_, > + evt.fseq_); > + } > + } > + return rc; > +} > + > +uint32_t process_all_events(void) { > + enum { FD_FCTRL = 0, NUM_FDS }; > + > + int poll_tmo = MDTM_TIPC_POLL_TIMEOUT; > + while (true) { > + int pollres; > + struct pollfd pfd[NUM_FDS] = {{0}}; > + > + pfd[FD_FCTRL].fd = > + ncs_ipc_get_sel_obj(&mbx_events).rmv_obj; > + pfd[FD_FCTRL].events = POLLIN; > + > + pollres = poll(pfd, NUM_FDS, poll_tmo); > + > + if (pollres == -1) { > + if (errno == EINTR) continue; > + m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno)); > + break; > + } > + > + if (pollres > 0) { > + if (pfd[FD_FCTRL].revents == POLLIN) { > + Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv( > + &mbx_events)); > + > + if (evt == nullptr) continue; > + > + portid_map_mutex.lock(); > + process_flow_event(*evt); > + delete evt; > + portid_map_mutex.unlock(); > + } > + } > + } /* while */ > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t create_ncs_task(void *task_hdl) { > + int policy = SCHED_OTHER; /*root defaults */ > + int max_prio = sched_get_priority_max(policy); > + int min_prio = sched_get_priority_min(policy); > + int prio_val = ((max_prio - min_prio) * 0.87); > + > + if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed"); > + return NCSCC_RC_FAILURE; > + } > + if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed"); > + return NCSCC_RC_FAILURE; > + } > + if (ncs_task_create((NCS_OS_CB)process_all_events, 0, > + "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE, > + &task_hdl) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n"); > + return NCSCC_RC_FAILURE; > + } > + > + m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events"); > + return NCSCC_RC_SUCCESS; > +} > + > +} // end local namespace > + > +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id, > + uint64_t rcv_buf_size, bool mcast_enabled) { > + if (create_ncs_task(&p_task_hdl) != > + NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n"); > + return NCSCC_RC_FAILURE; > + } > + data_sock_fd = dgramsock; > + snd_rcv_portid = id; > + sock_buf_size = rcv_buf_size; > + is_mcast_enabled = mcast_enabled; > + > + m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]", > + id.node, id.ref); > + > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t mds_tipc_fctrl_shutdown(void) { > + if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n"); > + } > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len, > + uint16_t* next_seq) { > + uint32_t rc = NCSCC_RC_SUCCESS; > + > + portid_map_mutex.lock(); > + > + TipcPortId *portid = portid_lookup(id); > + if (portid == nullptr) { > + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u", > + id.node, id.ref, __LINE__); > + rc = NCSCC_RC_FAILURE; > + } else { > + // assign the sequence number of the outgoing message > + *next_seq = portid->GetCurrentSeq(); > + } > + > + portid_map_mutex.unlock(); > + > + return rc; > +} > + > +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, > + struct tipc_portid id) { > + uint32_t rc = NCSCC_RC_SUCCESS; > + > + portid_map_mutex.lock(); > + > + TipcPortId *portid = portid_lookup(id); > + if (portid == nullptr) { > + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u", > + id.node, id.ref, __LINE__); > + rc = NCSCC_RC_FAILURE; > + } else { > + portid->Queue(buffer, len); > + } > + > + portid_map_mutex.unlock(); > + > + return rc; > +} > + > +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type) { > + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID); > + > + portid_map_mutex.lock(); > + > + // Add this new tipc portid to the map > + TipcPortId *portid = portid_lookup(id); > + uint64_t uid = TipcPortId::GetUniqueId(id); > + if (portid == nullptr) { > + portid_map[uid] = portid = new TipcPortId(id, data_sock_fd, > + kChunkAckSize, sock_buf_size); > + m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u], > svc_cnt:%u", > + id.node, id.ref, svc_id, portid->svc_cnt_); > + } else { > + portid->svc_cnt_++; > + m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u", > + id.node, id.ref, svc_id, portid->svc_cnt_); > + } > + > + portid_map_mutex.unlock(); > + > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type) { > + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID); > + > + portid_map_mutex.lock(); > + > + // Delete this tipc portid out of the map > + TipcPortId *portid = portid_lookup(id); > + if (portid != nullptr) { > + portid->svc_cnt_--; > + m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u], > svc_cnt:%u", > + id.node, id.ref, svc_id, portid->svc_cnt_); > + } > + portid_map_mutex.unlock(); > + > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) { > + portid_map_mutex.lock(); > + > + // Delete this tipc portid out of the map > + TipcPortId *portid = portid_lookup(id); > + if (portid != nullptr) { > + delete portid; > + portid_map.erase(TipcPortId::GetUniqueId(id)); > + m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]", id.node, > id.ref); > + } > + > + portid_map_mutex.unlock(); > + > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, > + struct tipc_portid id) { > + HeaderMessage header; > + header.Decode(buffer); > + // if mds support flow control > + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { > + if (header.pro_id_ == MDS_PROT_FCTRL_ID) { > + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) { > + // receive single ack message > + ChunkAck ack; > + ack.Decode(buffer); > + // send to the event thread > + if (m_NCS_IPC_SEND(&mbx_events, > + new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_, > + header.mseq_, header.mfrag_, ack.acked_fseq_, > ack.chunk_size_), > + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); > + return NCSCC_RC_FAILURE; > + } > + return NCSCC_RC_SUCCESS; > + } > + } else { > + // receive data message > + DataMessage data; > + data.Decode(buffer); > + // send to the event thread > + if (m_NCS_IPC_SEND(&mbx_events, > + new Event(Event::Type::kEvtDropData, id, data.svc_id_, > + header.mseq_, header.mfrag_, header.fseq_), > + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); > + return NCSCC_RC_FAILURE; > + } > + return NCSCC_RC_SUCCESS; > + } > + } > + > + return NCSCC_RC_SUCCESS; > +} > + > +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, > + struct tipc_portid id) { > + HeaderMessage header; > + header.Decode(buffer); > + // if mds support flow control > + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { > + if (header.pro_id_ == MDS_PROT_FCTRL_ID) { > + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) { > + // receive single ack message > + ChunkAck ack; > + ack.Decode(buffer); > + // send to the event thread > + if (m_NCS_IPC_SEND(&mbx_events, > + new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_, > + header.mseq_, header.mfrag_, ack.acked_fseq_, > ack.chunk_size_), > + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { > + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); > + } > + // return NCSCC_RC_FAILURE, so the tipc receiving thread (legacy) > will > + // skip this data msg > + return NCSCC_RC_FAILURE; > + } > + } else { > + // receive data message > + DataMessage data; > + data.Decode(buffer); > + // todo: skip mcast/bcast, revisit > + if ((data.snd_type_ == MDS_SENDTYPE_BCAST || > + data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) { > + return NCSCC_RC_SUCCESS; > + } > + portid_map_mutex.lock(); > + uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData, > + id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_)); > + portid_map_mutex.unlock(); > + return rc; > + } > + } > + return NCSCC_RC_SUCCESS; > +} > diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h > new file mode 100644 > index 0000000..85a058f > --- /dev/null > +++ b/src/mds/mds_tipc_fctrl_intf.h > @@ -0,0 +1,47 @@ > +/* -*- OpenSAF -*- > + * > + * (C) Copyright 2019 The OpenSAF Foundation > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_ > +#define MDS_MDS_TIPC_FCTRL_INTF_H_ > + > +#include <linux/tipc.h> > +#include <stdbool.h> > +#include <stdint.h> > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id, > + uint64_t rcv_buf_size, bool mbrcast_enabled); > +uint32_t mds_tipc_fctrl_shutdown(void); > +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, > + struct tipc_portid id); > +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type); > +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type); > +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id); > +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, > + struct tipc_portid id); > +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len, > + uint16_t* next_seq); > +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, > + struct tipc_portid id); > +#ifdef __cplusplus > +} > +#endif > + > +#endif // MDS_MDS_TIPC_FCTRL_INTF_H_ > diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc > new file mode 100644 > index 0000000..abd38d3 > --- /dev/null > +++ b/src/mds/mds_tipc_fctrl_msg.cc > @@ -0,0 +1,142 @@ > +/* -*- OpenSAF -*- > + * > + * (C) Copyright 2019 The OpenSAF Foundation > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#include "mds/mds_tipc_fctrl_msg.h" > +#include "base/ncssysf_def.h" > + > +namespace mds { > +HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq, > + uint16_t mfrag, uint16_t fseq): msg_len_(msg_len), > + mseq_(mseq), mfrag_(mfrag), fseq_(fseq) { > + pro_ver_ = MDS_PROT_FCTRL; > + pro_id_ = 0; > + msg_type_ = 0; > +} > + > +void HeaderMessage::Encode(uint8_t *msg) { > + uint8_t *ptr; > + > + // encode message length > + ptr = &msg[0]; > + ncs_encode_16bit(&ptr, msg_len_); > + // encode sequence number > + ptr = &msg[2]; > + ncs_encode_32bit(&ptr, mseq_); > + // encode sequence number > + ptr = &msg[6]; > + ncs_encode_16bit(&ptr, mfrag_); > + // skip length_check: oct8&9 > + // encode protocol version > + ptr = &msg[10]; > + ncs_encode_8bit(&ptr, MDS_PROT_FCTRL); > +} > + > +void HeaderMessage::Decode(uint8_t *msg) { > + uint8_t *ptr; > + > + // decode message length > + ptr = &msg[0]; > + msg_len_ = ncs_decode_16bit(&ptr); > + // decode sequence number > + ptr = &msg[2]; > + mseq_ = ncs_decode_32bit(&ptr); > + // decode fragment number > + ptr = &msg[6]; > + mfrag_ = ncs_decode_16bit(&ptr); > + // decode protocol version > + ptr = &msg[10]; > + pro_ver_ = ncs_decode_8bit(&ptr); > + if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { > + // decode flow control sequence number > + ptr = &msg[8]; > + fseq_ = ncs_decode_16bit(&ptr); > + // decode protocol identifier > + ptr = &msg[11]; > + pro_id_ = ncs_decode_32bit(&ptr); > + if (pro_id_ == MDS_PROT_FCTRL_ID) { > + // decode message type > + ptr = &msg[15]; > + msg_type_ = ncs_decode_8bit(&ptr); > + } > + } else { > + if (mfrag_ != 0) { > + ptr = &msg[8]; > + fseq_ = ncs_decode_16bit(&ptr); > + if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL; > + } > + } > +} > + > +void DataMessage::Decode(uint8_t *msg) { > + uint8_t *ptr; > + > + // decode service id > + ptr = &msg[MDTM_PKT_TYPE_OFFSET + > + MDTM_FRAG_HDR_LEN + > + MDS_HEADER_RCVR_SVC_ID_POSITION]; > + svc_id_ = ncs_decode_16bit(&ptr); > + // decode snd_type > + ptr = &msg[17]; > + snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f; > +} > + > +DataMessage::~DataMessage() { > + if (msg_data_ != nullptr) { > + delete msg_data_; > + msg_data_ = nullptr; > + } > +} > + > +ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size): > + svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) { > + msg_type_ = kChunkAckMsgType; > +} > + > +void ChunkAck::Encode(uint8_t *msg) { > + uint8_t *ptr; > + // encode protocol identifier > + ptr = &msg[11]; > + ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID); > + // encode message type > + ptr = &msg[15]; > + ncs_encode_8bit(&ptr, kChunkAckMsgType); > + // encode service id > + ptr = &msg[16]; > + ncs_encode_16bit(&ptr, svc_id_); > + // encode flow control sequence number > + ptr = &msg[18]; > + ncs_encode_16bit(&ptr, acked_fseq_); > + // encode chunk size > + ptr = &msg[20]; > + ncs_encode_16bit(&ptr, chunk_size_); > +} > + > +void ChunkAck::Decode(uint8_t *msg) { > + uint8_t *ptr; > + > + // decode service id > + ptr = &msg[16]; > + svc_id_ = ncs_decode_16bit(&ptr); > + // decode flow control sequence number > + ptr = &msg[18]; > + acked_fseq_ = ncs_decode_16bit(&ptr); > + // decode chunk size > + ptr = &msg[20]; > + chunk_size_ = ncs_decode_16bit(&ptr); > +} > + > +} // end namespace mds > diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h > new file mode 100644 > index 0000000..677f256 > --- /dev/null > +++ b/src/mds/mds_tipc_fctrl_msg.h > @@ -0,0 +1,129 @@ > +/* -*- OpenSAF -*- > + * > + * (C) Copyright 2019 The OpenSAF Foundation > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_ > +#define MDS_MDS_TIPC_FCTRL_MSG_H_ > + > +#include <linux/tipc.h> > +#include "base/ncssysf_ipc.h" > +#include "base/ncssysf_tmr.h" > +#include "mds/mds_dt.h" > + > +namespace mds { > + > +class Event { > + public: > + enum class Type { > + kEvtPortIdAll = 0, > + kEvtPortIdUp, // event of new portid published (not used) > + kEvtPortIdDown, // event of portid widthdrawn (not used) > + > + kEvtDataFlowAll, > + kEvtRcvData, // event that received data msg from peer > + kEvtSendChunkAck, // event to send the ack for a chunk of N accumulative > + // data msgs (N>=1) > + kEvtRcvChunkAck, // event that received the ack for a chunk of N > + // accumulative data msgs (N>=1) > + kEvtSendSelectiveAck, // event to send the ack for a number of selective > + // data msgs (not supported) > + kEvtRcvSelectiveAck, // event that received the ack for a numer of > + // selective data msgs (not supported) > + kEvtDropData, // event reported from tipc that a message is not > + // delivered > + }; > + NCS_IPC_MSG next_{0}; > + Type type_; > + > + // Used for flow event only > + struct tipc_portid id_; > + uint16_t svc_id_{0}; > + uint32_t mseq_{0}; > + uint16_t mfrag_{0}; > + uint16_t fseq_{0}; > + uint16_t chunk_size_{1}; > + explicit Event(Type type):type_(type) {} > + Event(Type type, struct tipc_portid id, uint16_t svc_id, > + uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num): > + id_(id), svc_id_(svc_id), > + mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) { > + type_ = type; > + } > + Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq, > + uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size): > + id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag), > + fseq_(f_seg_num), chunk_size_(chunk_size) { > + type_ = type; > + } > +}; > + > +class BaseMessage { > + public: > + virtual ~BaseMessage() {} > + virtual void Decode(uint8_t *msg) {} > + virtual void Encode(uint8_t *msg) {} > +}; > + > +class HeaderMessage: public BaseMessage { > + public: > + uint8_t* msg_ptr_{nullptr}; > + uint16_t msg_len_{0}; > + uint32_t mseq_{0}; > + uint16_t mfrag_{0}; > + uint16_t fseq_{0}; > + uint8_t pro_ver_{0}; > + uint32_t pro_id_{0}; > + uint16_t msg_type_{0}; > + HeaderMessage() {} > + HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag, > + uint16_t fseq); > + virtual ~HeaderMessage() {} > + void Decode(uint8_t *msg) override; > + void Encode(uint8_t *msg) override; > +}; > + > +class DataMessage: public BaseMessage { > + public: > + HeaderMessage header_; > + uint16_t svc_id_{0}; > + > + uint8_t* msg_data_{nullptr}; > + uint8_t snd_type_{0}; > + > + DataMessage() {} > + virtual ~DataMessage(); > + void Decode(uint8_t *msg) override; > +}; > + > +class ChunkAck: public BaseMessage { > + public: > + static const uint8_t kChunkAckMsgType = 1; > + static const uint16_t kChunkAckMsgLength = 22; > + > + uint8_t msg_type_{0}; > + uint16_t svc_id_{0}; > + uint16_t acked_fseq_{0}; > + uint16_t chunk_size_{1}; > + ChunkAck() {} > + ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size); > + virtual ~ChunkAck() {} > + void Encode(uint8_t *msg) override; > + void Decode(uint8_t *msg) override; > +}; > + > +} // end namespace mds > + > +#endif // MDS_MDS_TIPC_FCTRL_MSG_H_ > diff --git a/src/mds/mds_tipc_fctrl_portid.cc > b/src/mds/mds_tipc_fctrl_portid.cc > new file mode 100644 > index 0000000..24d13ee > --- /dev/null > +++ b/src/mds/mds_tipc_fctrl_portid.cc > @@ -0,0 +1,261 @@ > +/* -*- OpenSAF -*- > + * > + * (C) Copyright 2019 The OpenSAF Foundation > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#include "mds/mds_tipc_fctrl_portid.h" > +#include "base/ncssysf_def.h" > + > +#include "mds/mds_dt.h" > +#include "mds/mds_log.h" > + > +namespace mds { > + > +void MessageQueue::Queue(DataMessage* msg) { > + queue_.push_back(msg); > +} > + > +DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) { > + for (auto it = queue_.begin(); it != queue_.end(); ++it) { > + DataMessage *m = *it; > + if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) { > + return m; > + } > + } > + return nullptr; > +} > + > +uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { > + uint64_t msg_len = 0; > + for (auto it = queue_.begin(); it != queue_.end();) { > + DataMessage *m = *it; > + if (fseq_from <= m->header_.fseq_ && > + m->header_.fseq_ <= fseq_to) { > + msg_len += m->header_.msg_len_; > + it = queue_.erase(it); > + delete m; > + } else { > + it++; > + } > + } > + return msg_len; > +} > + > +void MessageQueue::Clear() { > + while (queue_.empty() == false) { > + DataMessage* msg = queue_.front(); > + queue_.pop_front(); > + delete msg; > + } > +} > + > +TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize, > + uint64_t sock_buf_size): > + id_(id), bsrsock_(sock), chunk_size_(chksize), > rcv_buf_size_(sock_buf_size) { > +} > + > +TipcPortId::~TipcPortId() { > + // clear all msg in sndqueue_ > + sndqueue_.Clear(); > +} > + > +uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) { > + // this uid is equivalent to the mds adest > + uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref; > + return uid; > +} > + > +uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { > + struct sockaddr_tipc server_addr; > + ssize_t send_len = 0; > + uint32_t rc = NCSCC_RC_SUCCESS; > + > + memset(&server_addr, 0, sizeof(server_addr)); > + server_addr.family = AF_TIPC; > + server_addr.addrtype = TIPC_ADDR_ID; > + server_addr.addr.id = id_; > + send_len = sendto(bsrsock_, data, length, 0, > + (struct sockaddr *)&server_addr, sizeof(server_addr)); > + > + if (send_len == length) { > + rc = NCSCC_RC_SUCCESS; > + } else { > + m_MDS_LOG_ERR("sendto() err :%s", strerror(errno)); > + rc = NCSCC_RC_FAILURE; > + } > + return rc; > +} > + > +uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) { > + uint32_t rc = NCSCC_RC_SUCCESS; > + > + DataMessage *msg = new DataMessage; > + msg->header_.Decode(const_cast<uint8_t*>(data)); > + msg->Decode(const_cast<uint8_t*>(data)); > + msg->msg_data_ = new uint8_t[length]; > + memcpy(msg->msg_data_, data, length); > + sndqueue_.Queue(msg); > + ++sndwnd_.send_; > + sndwnd_.nacked_space_ += length; > + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " > + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > + id_.node, id_.ref, > + msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, > + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); > + > + return rc; > +} > + > +bool TipcPortId::ReceiveCapable(uint16_t sending_len) { > + return true; > +} > + > +void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id, > + uint16_t chksize) { > + uint8_t data[ChunkAck::kChunkAckMsgLength]; > + > + HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq); > + header.Encode(reinterpret_cast<uint8_t*>(&data)); > + > + ChunkAck sack(svc_id, fseq, chksize); > + sack.Encode(reinterpret_cast<uint8_t*>(&data)); > + Send(reinterpret_cast<uint8_t*>(&data), ChunkAck::kChunkAckMsgLength); > + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + "SndChkAck[fseq:%u, chunk:%u]", > + id_.node, id_.ref, > + fseq, chksize); > +} > + > +uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, > + uint16_t fseq, uint16_t svc_id) { > + uint32_t rc = NCSCC_RC_SUCCESS; > + // update receiver sequence window > + if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { > + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " > + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]", > + id_.node, id_.ref, > + mseq, mfrag, fseq, > + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); > + > + ++rcvwnd_.rcv_; > + if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) { > + // send ack for @chunk_size_ msgs starting from fseq > + SendChunkAck(fseq, svc_id, chunk_size_); > + rcvwnd_.acked_ = rcvwnd_.rcv_; > + } > + } else { > + // todo: update rcvwnd_.nacked_space_. > + // This nacked_space_ will tell the number of bytes that has not been > acked > + // to the sender. If this nacked_space_ is growing large, and approaching > + // the socket buffer size, the transmission of sender may be queued. > + // this nacked_space_ can be used to detect if the kChunkAckTimeout or > + // kChunkAckSize are set excessively large. > + // It is not used for now, so ignore it. > + > + // check for transmission error > + if (rcvwnd_.rcv_ + 1 < fseq) { > + if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { > + // peer does not realize that this portid reset > + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " > + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " > + "Warning[portid reset]", > + id_.node, id_.ref, > + mseq, mfrag, fseq, > + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); > + > + rcvwnd_.rcv_ = fseq; > + } else { > + rc = NCSCC_RC_FAILURE; > + // msg loss > + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " > + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " > + "Error[msg loss]", > + id_.node, id_.ref, > + mseq, mfrag, fseq, > + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); > + } > + } > + if (fseq <= rcvwnd_.acked_) { > + rc = NCSCC_RC_FAILURE; > + // unexpected retransmission > + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " > + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " > + "Error[unexpected retransmission]", > + id_.node, id_.ref, > + mseq, mfrag, fseq, > + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); > + } > + } > + return rc; > +} > + > +void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { > + // update sender sequence window > + if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) { > + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvChkAck[fseq:%u, chunk:%u], " > + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " > + "queue[size:%" PRIu64 "]", > + id_.node, id_.ref, > + fseq, chksize, > + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, > + sndqueue_.Size()); > + > + // fast forward the sndwnd_.acked_ sequence to fseq > + sndwnd_.acked_ = fseq; > + > + // remove a number @chksize messages out of sndqueue_ and decrease > + // the nacked_space_ of sender > + sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq); > + } else { > + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " > + "RcvChkAck[fseq:%u, chunk:%u], " > + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " > + "queue[size:%" PRIu64 "], " > + "Error[msg disordered]", > + id_.node, id_.ref, > + fseq, chksize, > + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, > + sndqueue_.Size()); > + } > +} > + > +void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, > + uint16_t fseq) { > + DataMessage* msg = sndqueue_.Find(mseq, mfrag); > + if (msg != nullptr) { > + // Resend the msg found > + Send(msg->msg_data_, msg->header_.msg_len_); > + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " > + "RsndData[mseq:%u, mfrag:%u, fseq:%u], " > + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", > + id_.node, id_.ref, > + mseq, mfrag, fseq, > + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); > + } else { > + m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], " > + "RsndData[mseq:%u, mfrag:%u, fseq:%u], " > + "Error[msg not found]", > + id_.node, id_.ref, > + mseq, mfrag, fseq); > + } > +} > + > +} // end namespace mds > diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h > new file mode 100644 > index 0000000..8068e6e > --- /dev/null > +++ b/src/mds/mds_tipc_fctrl_portid.h > @@ -0,0 +1,87 @@ > +/* -*- OpenSAF -*- > + * > + * (C) Copyright 2019 The OpenSAF Foundation > + * > + * This program is distributed in the hope that it will be useful, but > + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY > + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed > + * under the GNU Lesser General Public License Version 2.1, February 1999. > + * The complete license can be accessed from the following location: > + * http://opensource.org/licenses/lgpl-license.php > + * See the Copying file included with the OpenSAF distribution for full > + * licensing terms. > + * > + * Author(s): Ericsson AB > + * > + */ > + > +#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_ > +#define MDS_MDS_TIPC_FCTRL_PORTID_H_ > + > +#include <linux/tipc.h> > +#include <stdbool.h> > +#include <stdint.h> > +#include <stdio.h> > +#include <unistd.h> > +#include <deque> > +#include "mds/mds_tipc_fctrl_msg.h" > + > +namespace mds { > + > +class MessageQueue { > + public: > + void Queue(DataMessage* msg); > + DataMessage* Find(uint32_t mseq, uint16_t mfrag); > + uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to); > + uint64_t Size() const { return queue_.size(); } > + void Clear(); > + private: > + std::deque<DataMessage*> queue_; > +}; > + > +class TipcPortId { > + public: > + TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size, > + uint64_t sock_buf_size); > + ~TipcPortId(); > + static uint64_t GetUniqueId(struct tipc_portid id); > + int GetSock() const { return bsrsock_; } > + uint16_t GetCurrentSeq() { return sndwnd_.send_; } > + bool ReceiveCapable(uint16_t sending_len); > + void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size); > + void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size); > + uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, > + uint16_t fseq, uint16_t svc_id); > + void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); > + uint32_t Send(uint8_t* data, uint16_t length); > + uint32_t Queue(const uint8_t* data, uint16_t length); > + > + uint16_t svc_cnt_{1}; // number of service subscribed on this portid > + > + private: > + struct tipc_portid id_; > + int bsrsock_; // tipc socket to send/receive data per tipc_portid > + uint16_t chunk_size_{5}; > + uint64_t rcv_buf_size_{0}; // estimated buffer size at receiver > + > + struct sndwnd { > + // sender sequence window > + uint16_t acked_{0}; // last sequence has been acked by receiver > + uint16_t send_{1}; // next sequence to be sent > + uint64_t nacked_space_{0}; // total bytes are sent but not acked > + }; > + struct sndwnd sndwnd_; > + > + struct rcvwnd { > + // receiver sequence window > + uint16_t acked_{0}; // last sequence has been acked to sender > + uint16_t rcv_{0}; // last sequence has been received > + uint64_t nacked_space_{0}; // total bytes has not been acked > + }; > + struct rcvwnd rcvwnd_; > + > + MessageQueue sndqueue_; > +}; > + > +} // end namespace mds > +#endif // MDS_MDS_TIPC_FCTRL_PORTID_H_ _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel