This is a collaborative patch of two participants: - Tran Thuan <thuan.t...@dektech.com.au> - Minh Chau <minh.c...@dektech.com.au>
Main changes: - Add mds_tipc_fctrl_intf.h, mds_tipc_fctrl_intf.cc: These two files introduce new functions which are called in mds_dt_tipc.c if the flow control is enabled - Add mds_tipc_fctrl_portid.h, mds_tipc_fctrl_portid.cc: These files implements the tipc portid instance, which supports the sliding window, mds msg queue - Add mds_tipc_fctrl_msg.h, mds_tipc_fctrl_msg.cc: These files define the event and messages which are used for this solution. --- src/mds/Makefile.am | 10 +- src/mds/mds_dt.h | 8 +- src/mds/mds_dt_tipc.c | 188 +++++++++++++------- src/mds/mds_tipc_fctrl_intf.cc | 376 +++++++++++++++++++++++++++++++++++++++ src/mds/mds_tipc_fctrl_intf.h | 47 +++++ src/mds/mds_tipc_fctrl_msg.cc | 142 +++++++++++++++ src/mds/mds_tipc_fctrl_msg.h | 129 ++++++++++++++ src/mds/mds_tipc_fctrl_portid.cc | 261 +++++++++++++++++++++++++++ src/mds/mds_tipc_fctrl_portid.h | 87 +++++++++ 9 files changed, 1184 insertions(+), 64 deletions(-) create mode 100644 src/mds/mds_tipc_fctrl_intf.cc create mode 100644 src/mds/mds_tipc_fctrl_intf.h create mode 100644 src/mds/mds_tipc_fctrl_msg.cc create mode 100644 src/mds/mds_tipc_fctrl_msg.h create mode 100644 src/mds/mds_tipc_fctrl_portid.cc create mode 100644 src/mds/mds_tipc_fctrl_portid.h diff --git a/src/mds/Makefile.am b/src/mds/Makefile.am index 2d7b652..d849e8f 100644 --- a/src/mds/Makefile.am +++ b/src/mds/Makefile.am @@ -48,10 +48,16 @@ lib_libopensaf_core_la_SOURCES += \ if ENABLE_TIPC_TRANSPORT noinst_HEADERS += src/mds/mds_dt_tipc.h \ src/mds/mds_tipc_recvq_stats.h \ - src/mds/mds_tipc_recvq_stats_impl.h + src/mds/mds_tipc_recvq_stats_impl.h \ + src/mds/mds_tipc_fctrl_intf.h \ + src/mds/mds_tipc_fctrl_portid.h \ + src/mds/mds_tipc_fctrl_msg.h lib_libopensaf_core_la_SOURCES += src/mds/mds_dt_tipc.c \ src/mds/mds_tipc_recvq_stats.cc \ - src/mds/mds_tipc_recvq_stats_impl.cc + src/mds/mds_tipc_recvq_stats_impl.cc \ + src/mds/mds_tipc_fctrl_intf.cc \ + src/mds/mds_tipc_fctrl_portid.cc \ + src/mds/mds_tipc_fctrl_msg.cc endif if ENABLE_TESTS diff --git a/src/mds/mds_dt.h b/src/mds/mds_dt.h index b645bb4..d9e8633 100644 --- a/src/mds/mds_dt.h +++ b/src/mds/mds_dt.h @@ -162,7 +162,7 @@ uint32_t mdtm_del_from_ref_tbl(MDS_SUBTN_REF_VAL ref); uint32_t mds_tmr_mailbox_processing(void); uint32_t mdtm_get_from_ref_tbl(MDS_SUBTN_REF_VAL ref, MDS_SVC_HDL *svc_hdl); uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num, - uint16_t frag_byte); + uint16_t frag_byte, uint16_t fctrl_seq_num); uint32_t mdtm_free_reassem_msg_mem(MDS_ENCODED_MSG *msg); uint32_t mdtm_process_recv_data(uint8_t *buf, uint16_t len, uint64_t tipc_id, uint32_t *buff_dump); @@ -240,9 +240,13 @@ bool mdtm_mailbox_mbx_cleanup(NCSCONTEXT arg, NCSCONTEXT msg); #define MDS_PROT 0xA0 #define MDS_VERSION 0x08 -#define MDS_PROT_VER_MASK (MDS_PROT | MDS_VERSION) +#define MDS_PROT_VER_MASK 0xFC #define MDTM_PRI_MASK 0x3 +/* MDS protocol/version for flow control */ +#define MDS_PROT_FCTRL (0xB0 | MDS_VERSION) +#define MDS_PROT_FCTRL_ID 0x00AC13F5 + /* Added for the subscription changes */ #define MDS_NCS_CHASSIS_ID (m_NCS_GET_NODE_ID & 0x00ff0000) #define MDS_TIPC_COMMON_ID 0x01001000 diff --git a/src/mds/mds_dt_tipc.c b/src/mds/mds_dt_tipc.c index 86b52bb..fef1c50 100644 --- a/src/mds/mds_dt_tipc.c +++ b/src/mds/mds_dt_tipc.c @@ -47,6 +47,7 @@ #include "mds_dt_tipc.h" #include "mds_dt_tcp_disc.h" #include "mds_core.h" +#include "mds_tipc_fctrl_intf.h" #include "mds_tipc_recvq_stats.h" #include "base/osaf_utility.h" #include "base/osaf_poll.h" @@ -165,20 +166,22 @@ NCS_PATRICIA_TREE mdtm_reassembly_list; uint32_t mdtm_global_frag_num; const unsigned int MAX_RECV_THRESHOLD = 30; +uint8_t gl_mds_pro_ver = MDS_PROT | MDS_VERSION; -static bool get_tipc_port_id(int sock, uint32_t* port_id) { +static bool get_tipc_port_id(int sock, struct tipc_portid* port_id) { struct sockaddr_tipc addr; socklen_t sz = sizeof(addr); memset(&addr, 0, sizeof(addr)); - *port_id = 0; + port_id->node = 0; + port_id->ref = 0; if (0 > getsockname(sock, (struct sockaddr *)&addr, &sz)) { syslog(LOG_ERR, "MDTM:TIPC Failed to get socket name, err: %s", strerror(errno)); return false; } - *port_id = addr.addr.id.ref; + *port_id = addr.addr.id; return true; } @@ -240,12 +243,13 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref) } /* Code for getting the self tipc random number */ - if (!get_tipc_port_id(tipc_cb.BSRsock, mds_tipc_ref)) { + struct tipc_portid port_id; + if (!get_tipc_port_id(tipc_cb.BSRsock, &port_id)) { close(tipc_cb.Dsock); close(tipc_cb.BSRsock); return NCSCC_RC_FAILURE; } - + *mds_tipc_ref = port_id.ref; tipc_cb.adest = ((uint64_t)(nodeid)) << 32; tipc_cb.adest |= *mds_tipc_ref; tipc_cb.node_id = nodeid; @@ -325,6 +329,23 @@ uint32_t mdtm_tipc_init(NODE_ID nodeid, uint32_t *mds_tipc_ref) mdtm_set_transport(MDTM_TX_TYPE_TIPC); } + /* Get tipc socket receive buffer size */ + int optval; + socklen_t optlen = sizeof(optval); + if (getsockopt(tipc_cb.BSRsock, SOL_SOCKET, SO_RCVBUF, + &optval, &optlen) != 0) { + syslog(LOG_ERR, "MDTM: getsockopt() failed to get rcv buf size to: %str", + strerror(errno)); + close(tipc_cb.Dsock); + close(tipc_cb.BSRsock); + return NCSCC_RC_FAILURE; + } + + /* Create flow control tasks if enabled*/ + gl_mds_pro_ver = MDS_PROT_FCTRL; + mds_tipc_fctrl_initialize(tipc_cb.BSRsock, port_id, + (uint64_t)optval, tipc_mcast_enabled); + /* Create a task to receive the events and data */ if (mdtm_create_rcv_task(tipc_cb.hdle_mdtm) != NCSCC_RC_SUCCESS) { syslog(LOG_ERR, @@ -469,7 +490,7 @@ uint32_t mdtm_tipc_destroy(void) NULL); m_NCS_IPC_RELEASE(&tipc_cb.tmr_mbx, (NCS_IPC_CB)mdtm_mailbox_mbx_cleanup); - + mds_tipc_fctrl_shutdown(); /* Clear reference hdl list */ while (mdtm_ref_hdl_list_hdr != NULL) { /* Store temporary the pointer of entry to be deleted */ @@ -672,36 +693,26 @@ ssize_t recvfrom_connectionless(int sd, void *buf, size_t nbytes, int flags, 0)); if (anc_data[0] == TIPC_ERR_OVERLOAD) { LOG_ER( - "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD", - ((struct sockaddr_tipc*) from)->addr.id.node, - ((struct sockaddr_tipc*) from)->addr.id.ref); - m_MDS_LOG_CRITICAL( "MDTM: From <0x%"PRIx32 ":%"PRIu32 "> undeliverable message condition ancillary data: TIPC_ERR_OVERLOAD ", ((struct sockaddr_tipc*) from)->addr.id.node, ((struct sockaddr_tipc*) from)->addr.id.ref); - } else { + } else if (anc_data[0] == TIPC_ERR_NO_PORT){ /* TIPC_ERRINFO - TIPC error * code associated with a * returned data message or a * connection termination * message */ - m_MDS_LOG_DBG( - "MDTM: undelivered message condition ancillary data: TIPC_ERRINFO err : %d", - anc_data[0]); + mds_tipc_fctrl_portid_terminate(((struct sockaddr_tipc*)from)->addr.id); + } else { + m_MDS_LOG_ERR("MDTM:: TIPC_ERRINFO anc_data[0]:%u", anc_data[0]); } } else if (anc->cmsg_type == TIPC_RETDATA) { - /* If we set TIPC_DEST_DROPPABLE off - message (configure TIPC to return - rejected messages to the sender ) we - will hit this when we implement MDS - retransmit lost messages, can be - replaced with flow control logic */ /* TIPC_RETDATA -The contents of a * returned data message */ - LOG_IN( - "MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); - m_MDS_LOG_INFO( - "MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); + LOG_IN("MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); + uint16_t ret_msg_len = anc->cmsg_len - sizeof(*anc); + unsigned char *ret_msg = CMSG_DATA(anc); + mds_tipc_fctrl_drop_data(ret_msg, ret_msg_len, ((struct sockaddr_tipc*)from)->addr.id); } else if (anc->cmsg_type == TIPC_DESTNAME) { if (sz == 0) { m_MDS_LOG_DBG( @@ -822,7 +833,7 @@ static uint32_t mdtm_process_recv_events(void) m_MDS_LOG_INFO( "MDTM: Published: "); m_MDS_LOG_INFO( - "MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", + "MDTM: <%x,%x,%x> port id <0x%08x:%u>\n", NTOHL( event.s .seq @@ -841,7 +852,12 @@ static uint32_t mdtm_process_recv_events(void) event .port .ref)); - + struct tipc_portid id = { + .node = NTOHL(event.port.node), + .ref = NTOHL(event.port.ref) + }; + uint32_t type = NTOHL(event.s.seq.type); + mds_tipc_fctrl_portid_up(id, type); if (NCSCC_RC_SUCCESS != mdtm_process_discovery_events( TIPC_PUBLISHED, @@ -873,7 +889,12 @@ static uint32_t mdtm_process_recv_events(void) event .port .ref)); - + struct tipc_portid id = { + .node = NTOHL(event.port.node), + .ref = NTOHL(event.port.ref) + }; + uint32_t type = NTOHL(event.s.seq.type); + mds_tipc_fctrl_portid_down(id, type); if (NCSCC_RC_SUCCESS != mdtm_process_discovery_events( TIPC_WITHDRAWN, @@ -986,8 +1007,6 @@ static uint32_t mdtm_process_recv_events(void) break; } if (recd_bytes == 0) { - m_MDS_LOG_DBG( - "MDTM: recd bytes=0 on received on sock, abnormal/unknown/hack condition. Ignoring"); break; } data = inbuf; @@ -1076,9 +1095,10 @@ static uint32_t mdtm_process_recv_events(void) &buff_dump); } #else - mdtm_process_recv_data( - &inbuf[2], recd_bytes - 2, - tipc_id, &buff_dump); + if (mds_tipc_fctrl_rcv_data(inbuf, recd_bytes, client_addr.addr.id) + == NCSCC_RC_SUCCESS) { + mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, &buff_dump); + } #endif } else { uint64_t tipc_id; @@ -1873,9 +1893,9 @@ uint32_t mds_mdtm_svc_install_tipc(PW_ENV_ID pwe_id, MDS_SVC_ID svc_id, server_addr.addr.nameseq.upper = server_inst; /* The self tipc random port number */ - uint32_t port_id = 0; + struct tipc_portid port_id; get_tipc_port_id(tipc_cb.BSRsock, &port_id); - m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id, + m_MDS_LOG_NOTIFY("MDTM: install_tipc : <p:%u,s:%u,i:%u>", port_id.ref, server_type, server_inst); if (0 != bind(tipc_cb.BSRsock, (struct sockaddr *)&server_addr, @@ -2495,6 +2515,7 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) */ uint32_t status = 0; uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len; + uint16_t fctrl_seq_num = 0; int version = req->msg_arch_word & 0x7; if (version > 1) { sum_mds_hdr_plus_mdtm_hdr_plus_len = @@ -2583,11 +2604,16 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) mdtm_add_mds_hdr(buffer_ack, req)) { return NCSCC_RC_FAILURE; } - + /* if sndqueue is capable, then obtain the current sending seq */ + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, len, &fctrl_seq_num) + == NCSCC_RC_FAILURE){ + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len); + return NCSCC_RC_FAILURE; + } /* Add frag_hdr */ if (NCSCC_RC_SUCCESS != mdtm_add_frag_hdr(buffer_ack, len, frag_seq_num, - 0)) { + 0, fctrl_seq_num)) { return NCSCC_RC_FAILURE; } @@ -2676,13 +2702,22 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) free(body); return NCSCC_RC_FAILURE; } + /* if sndqueue is capable, then obtain the current sending seq */ + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, + len + sum_mds_hdr_plus_mdtm_hdr_plus_len, + &fctrl_seq_num) == NCSCC_RC_FAILURE){ + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", + len + sum_mds_hdr_plus_mdtm_hdr_plus_len); + free(body); + return NCSCC_RC_FAILURE; + } if (NCSCC_RC_SUCCESS != mdtm_add_frag_hdr( body, (len + sum_mds_hdr_plus_mdtm_hdr_plus_len), - frag_seq_num, 0)) { + frag_seq_num, 0, fctrl_seq_num)) { m_MDS_LOG_ERR( "MDTM: Unable to add the frag Hdr to the send msg\n"); m_MMGR_FREE_BUFR_LIST(usrbuf); @@ -2778,12 +2813,23 @@ uint32_t mds_mdtm_send_tipc(MDTM_SEND_REQ *req) req->msg.data.buff_info.buff); return NCSCC_RC_FAILURE; } + /* if sndqueue is capable, then obtain the current sending seq */ + if (mds_tipc_fctrl_sndqueue_capable(tipc_id, + req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len, + &fctrl_seq_num) == NCSCC_RC_FAILURE) { + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", + req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len); + free(body); + mds_free_direct_buff(req->msg.data.buff_info.buff); + return NCSCC_RC_FAILURE; + } + if (NCSCC_RC_SUCCESS != mdtm_add_frag_hdr( body, req->msg.data.buff_info.len + sum_mds_hdr_plus_mdtm_hdr_plus_len, - frag_seq_num, 0)) { + frag_seq_num, 0, fctrl_seq_num)) { m_MDS_LOG_ERR( "MDTM: Unable to add the frag Hdr to the send msg\n"); free(body); @@ -2860,6 +2906,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num, uint16_t i = 1; uint16_t frag_val = 0; uint32_t sum_mds_hdr_plus_mdtm_hdr_plus_len; + uint16_t fctrl_seq_num = 0; int version = req->msg_arch_word & 0x7; uint32_t ret = NCSCC_RC_SUCCESS; @@ -2938,9 +2985,18 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num, return NCSCC_RC_FAILURE; } } + /* if sndqueue is capable, then obtain the current sending seq */ + if (mds_tipc_fctrl_sndqueue_capable(id, len_buf, &fctrl_seq_num) + == NCSCC_RC_FAILURE) { + m_MDS_LOG_ERR("FCTRL: Failed to send message len :%d", len_buf); + m_MMGR_FREE_BUFR_LIST(usrbuf); + free(body); + return NCSCC_RC_FAILURE; + } + if (NCSCC_RC_SUCCESS != mdtm_add_frag_hdr(body, len_buf, seq_num, - frag_val)) { + frag_val, fctrl_seq_num)) { m_MDS_LOG_ERR( "MDTM: Frag hde addition failed\n"); m_MMGR_FREE_BUFR_LIST(usrbuf); @@ -2996,7 +3052,7 @@ uint32_t mdtm_frag_and_send(MDTM_SEND_REQ *req, uint32_t seq_num, *********************************************************/ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num, - uint16_t frag_byte) + uint16_t frag_byte, uint16_t fctrl_seq_num) { /* Add the FRAG HDR to the Buffer */ uint8_t *data; @@ -3013,9 +3069,17 @@ uint32_t mdtm_add_frag_hdr(uint8_t *buf_ptr, uint16_t len, uint32_t seq_num, 5); /* 2 bytes for keeping len, to cross check at the receiver end */ #else - ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - - 2); /* 2 bytes for keeping len, to cross - check at the receiver end */ + /* Next 2 bytes for keeping len, to cross check at the receiver end. + * Used to be encoded as below: + * + * ncs_encode_16bit(&data, len - MDTM_FRAG_HDR_LEN - 2); + * + * However, these 2 bytes have been never examined at receiver. As backward + * compatibility, the fragment header and mds data header can't be extended, + * hereafter, these 2 bytes will be used as sequence number in flow control + * (per tipc portid) + * */ + ncs_encode_16bit(&data, fctrl_seq_num); #endif return NCSCC_RC_SUCCESS; } @@ -3060,21 +3124,25 @@ static uint32_t mdtm_sendto(uint8_t *buffer, uint16_t buff_len, buffer[4] = checksum; } #endif - - send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0, - (struct sockaddr *)&server_addr, sizeof(server_addr)); - if (send_len == buff_len) { - m_MDS_LOG_INFO("MDTM: Successfully sent message"); - return NCSCC_RC_SUCCESS; - } else if (send_len == -1) { - m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", - strerror(errno)); - return NCSCC_RC_FAILURE; - } else { - m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd", - send_len); - return NCSCC_RC_FAILURE; + if (mds_tipc_fctrl_trysend(buffer, buff_len, id) == NCSCC_RC_SUCCESS) { + send_len = sendto(tipc_cb.BSRsock, buffer, buff_len, 0, + (struct sockaddr *)&server_addr, sizeof(server_addr)); + if (send_len == buff_len) { + m_MDS_LOG_INFO("MDTM: Successfully sent message"); + return NCSCC_RC_SUCCESS; + } else if (send_len == -1) { + m_MDS_LOG_ERR("MDTM: Failed to send message err :%s", strerror(errno)); + // todo: it's failed to send msg, if flow control is enabled + // the msg needs to mark unsent + return NCSCC_RC_FAILURE; + } else { + m_MDS_LOG_ERR("MDTM: Failed to send message send_len :%zd", send_len); + // todo: it's failed to send msg, if flow control is enabled + // the msg needs to mark unsent + return NCSCC_RC_FAILURE; + } } + return NCSCC_RC_SUCCESS; } /********************************************************* @@ -3103,12 +3171,12 @@ static uint32_t mdtm_mcast_sendto(void *buffer, size_t size, server_addr.addr.nameseq.lower = HTONL(MDS_MDTM_LOWER_INSTANCE); /*This can be scope-down to dest_svc_id server_inst TBD*/ server_addr.addr.nameseq.upper = HTONL(MDS_MDTM_UPPER_INSTANCE); - int send_len = sendto(tipc_cb.BSRsock, buffer, size, 0, (struct sockaddr *)&server_addr, sizeof(server_addr)); + if (send_len == size) { - m_MDS_LOG_INFO("MDTM: Successfully sent message"); + m_MDS_LOG_INFO("MDTM: Successfully sent mcast message"); return NCSCC_RC_SUCCESS; } else { m_MDS_LOG_ERR("MDTM: Failed to send Multicast message err :%s", @@ -3151,7 +3219,7 @@ static uint32_t mdtm_add_mds_hdr(uint8_t *buffer, MDTM_SEND_REQ *req) uint8_t *ptr; ptr = buffer; - prot_ver |= MDS_PROT | MDS_VERSION | ((uint8_t)(req->pri - 1)); + prot_ver |= gl_mds_pro_ver | ((uint8_t)(req->pri - 1)); enc_snd_type = (req->msg.encoding << 6); enc_snd_type |= (uint8_t)req->snd_type; diff --git a/src/mds/mds_tipc_fctrl_intf.cc b/src/mds/mds_tipc_fctrl_intf.cc new file mode 100644 index 0000000..91b9107 --- /dev/null +++ b/src/mds/mds_tipc_fctrl_intf.cc @@ -0,0 +1,376 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2019 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ +#include "mds/mds_tipc_fctrl_intf.h" + +#include <poll.h> +#include <pthread.h> +#include <stdio.h> +#include <sys/poll.h> +#include <unistd.h> + +#include <map> +#include <mutex> + +#include "base/ncssysf_def.h" +#include "base/ncssysf_tsk.h" + +#include "mds/mds_log.h" +#include "mds/mds_tipc_fctrl_portid.h" +#include "mds/mds_tipc_fctrl_msg.h" + +using mds::Event; +using mds::TipcPortId; +using mds::DataMessage; +using mds::ChunkAck; +using mds::HeaderMessage; + +namespace { +// multicast/broadcast enabled +// todo: to be removed if flow control support it +bool is_mcast_enabled = true; + +// mailbox handles for events +SYSF_MBX mbx_events; + +// ncs task handle +NCSCONTEXT p_task_hdl = nullptr; + +// data socket associated with port id +int data_sock_fd = 0; +struct tipc_portid snd_rcv_portid; + +// socket receiver's buffer size +// todo: This buffer size is read from the local node as an estimated +// buffer size of the receiver sides, in facts that could be different +// at the receiver's sides (in the other nodes). At this moment, we +// assume all nodes in cluster have set the same tipc buffer size +uint64_t sock_buf_size = 0; + +// map of key:unique id(adest), value: TipcPortId instance +// unique id is derived from struct tipc_portid +std::map<uint64_t, TipcPortId*> portid_map; +std::mutex portid_map_mutex; + +// chunk ack parameters +// todo: The chunk ack size should be configurable +uint16_t kChunkAckSize = 3; + +TipcPortId* portid_lookup(struct tipc_portid id) { + uint64_t uid = TipcPortId::GetUniqueId(id); + if (portid_map.find(uid) == portid_map.end()) return nullptr; + return portid_map[uid]; +} + +uint32_t process_flow_event(const Event evt) { + uint32_t rc = NCSCC_RC_SUCCESS; + TipcPortId *portid = portid_lookup(evt.id_); + if (portid == nullptr) { + if (evt.type_ == Event::Type::kEvtRcvData) { + portid = new TipcPortId(evt.id_, data_sock_fd, kChunkAckSize, + sock_buf_size); + portid_map[TipcPortId::GetUniqueId(evt.id_)] = portid; + if (evt.type_ == Event::Type::kEvtRcvData) { + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, + evt.fseq_, evt.svc_id_); + } + } + } else { + if (evt.type_ == Event::Type::kEvtRcvData) { + rc = portid->ReceiveData(evt.mseq_, evt.mfrag_, + evt.fseq_, evt.svc_id_); + } + if (evt.type_ == Event::Type::kEvtRcvChunkAck) { + portid->ReceiveChunkAck(evt.fseq_, evt.chunk_size_); + } + if (evt.type_ == Event::Type::kEvtSendChunkAck) { + portid->SendChunkAck(evt.fseq_, evt.svc_id_, evt.chunk_size_); + } + if (evt.type_ == Event::Type::kEvtDropData) { + portid->ReceiveNack(evt.mseq_, evt.mfrag_, + evt.fseq_); + } + } + return rc; +} + +uint32_t process_all_events(void) { + enum { FD_FCTRL = 0, NUM_FDS }; + + int poll_tmo = MDTM_TIPC_POLL_TIMEOUT; + while (true) { + int pollres; + struct pollfd pfd[NUM_FDS] = {{0}}; + + pfd[FD_FCTRL].fd = + ncs_ipc_get_sel_obj(&mbx_events).rmv_obj; + pfd[FD_FCTRL].events = POLLIN; + + pollres = poll(pfd, NUM_FDS, poll_tmo); + + if (pollres == -1) { + if (errno == EINTR) continue; + m_MDS_LOG_ERR("FCTRL: poll() failed:%s", strerror(errno)); + break; + } + + if (pollres > 0) { + if (pfd[FD_FCTRL].revents == POLLIN) { + Event *evt = reinterpret_cast<Event*>(ncs_ipc_non_blk_recv( + &mbx_events)); + + if (evt == nullptr) continue; + + portid_map_mutex.lock(); + process_flow_event(*evt); + delete evt; + portid_map_mutex.unlock(); + } + } + } /* while */ + return NCSCC_RC_SUCCESS; +} + +uint32_t create_ncs_task(void *task_hdl) { + int policy = SCHED_OTHER; /*root defaults */ + int max_prio = sched_get_priority_max(policy); + int min_prio = sched_get_priority_min(policy); + int prio_val = ((max_prio - min_prio) * 0.87); + + if (m_NCS_IPC_CREATE(&mbx_events) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("m_NCS_IPC_CREATE failed"); + return NCSCC_RC_FAILURE; + } + if (m_NCS_IPC_ATTACH(&mbx_events) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("m_NCS_IPC_ATTACH failed"); + return NCSCC_RC_FAILURE; + } + if (ncs_task_create((NCS_OS_CB)process_all_events, 0, + "OSAF_MDS", prio_val, policy, NCS_MDTM_STACKSIZE, + &task_hdl) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Task Creation-failed:\n"); + return NCSCC_RC_FAILURE; + } + + m_MDS_LOG_NOTIFY("FCTRL: Start process_all_events"); + return NCSCC_RC_SUCCESS; +} + +} // end local namespace + +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id, + uint64_t rcv_buf_size, bool mcast_enabled) { + if (create_ncs_task(&p_task_hdl) != + NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Start of the Created Task-failed:\n"); + return NCSCC_RC_FAILURE; + } + data_sock_fd = dgramsock; + snd_rcv_portid = id; + sock_buf_size = rcv_buf_size; + is_mcast_enabled = mcast_enabled; + + m_MDS_LOG_NOTIFY("FCTRL: Initialize [node:%x, ref:%u]", + id.node, id.ref); + + return NCSCC_RC_SUCCESS; +} + +uint32_t mds_tipc_fctrl_shutdown(void) { + if (ncs_task_release(p_task_hdl) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Stop of the Created Task-failed:\n"); + } + return NCSCC_RC_SUCCESS; +} + +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len, + uint16_t* next_seq) { + uint32_t rc = NCSCC_RC_SUCCESS; + + portid_map_mutex.lock(); + + TipcPortId *portid = portid_lookup(id); + if (portid == nullptr) { + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u", + id.node, id.ref, __LINE__); + rc = NCSCC_RC_FAILURE; + } else { + // assign the sequence number of the outgoing message + *next_seq = portid->GetCurrentSeq(); + } + + portid_map_mutex.unlock(); + + return rc; +} + +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, + struct tipc_portid id) { + uint32_t rc = NCSCC_RC_SUCCESS; + + portid_map_mutex.lock(); + + TipcPortId *portid = portid_lookup(id); + if (portid == nullptr) { + m_MDS_LOG_ERR("FCTRL: PortId not found [node:%x, ref:%u] line:%u", + id.node, id.ref, __LINE__); + rc = NCSCC_RC_FAILURE; + } else { + portid->Queue(buffer, len); + } + + portid_map_mutex.unlock(); + + return rc; +} + +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type) { + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID); + + portid_map_mutex.lock(); + + // Add this new tipc portid to the map + TipcPortId *portid = portid_lookup(id); + uint64_t uid = TipcPortId::GetUniqueId(id); + if (portid == nullptr) { + portid_map[uid] = portid = new TipcPortId(id, data_sock_fd, + kChunkAckSize, sock_buf_size); + m_MDS_LOG_NOTIFY("FCTRL: Add portid[node:%x, ref:%u svc_id:%u], svc_cnt:%u", + id.node, id.ref, svc_id, portid->svc_cnt_); + } else { + portid->svc_cnt_++; + m_MDS_LOG_NOTIFY("FCTRL: Add svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u", + id.node, id.ref, svc_id, portid->svc_cnt_); + } + + portid_map_mutex.unlock(); + + return NCSCC_RC_SUCCESS; +} + +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type) { + MDS_SVC_ID svc_id = (uint16_t)(type & MDS_EVENT_MASK_FOR_SVCID); + + portid_map_mutex.lock(); + + // Delete this tipc portid out of the map + TipcPortId *portid = portid_lookup(id); + if (portid != nullptr) { + portid->svc_cnt_--; + m_MDS_LOG_NOTIFY("FCTRL: Remove svc[node:%x, ref:%u svc_id:%u], svc_cnt:%u", + id.node, id.ref, svc_id, portid->svc_cnt_); + } + portid_map_mutex.unlock(); + + return NCSCC_RC_SUCCESS; +} + +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id) { + portid_map_mutex.lock(); + + // Delete this tipc portid out of the map + TipcPortId *portid = portid_lookup(id); + if (portid != nullptr) { + delete portid; + portid_map.erase(TipcPortId::GetUniqueId(id)); + m_MDS_LOG_NOTIFY("FCTRL: Remove portid[node:%x, ref:%u]", id.node, id.ref); + } + + portid_map_mutex.unlock(); + + return NCSCC_RC_SUCCESS; +} + +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, + struct tipc_portid id) { + HeaderMessage header; + header.Decode(buffer); + // if mds support flow control + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { + if (header.pro_id_ == MDS_PROT_FCTRL_ID) { + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) { + // receive single ack message + ChunkAck ack; + ack.Decode(buffer); + // send to the event thread + if (m_NCS_IPC_SEND(&mbx_events, + new Event(Event::Type::kEvtSendChunkAck, id, ack.svc_id_, + header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_), + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); + return NCSCC_RC_FAILURE; + } + return NCSCC_RC_SUCCESS; + } + } else { + // receive data message + DataMessage data; + data.Decode(buffer); + // send to the event thread + if (m_NCS_IPC_SEND(&mbx_events, + new Event(Event::Type::kEvtDropData, id, data.svc_id_, + header.mseq_, header.mfrag_, header.fseq_), + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); + return NCSCC_RC_FAILURE; + } + return NCSCC_RC_SUCCESS; + } + } + + return NCSCC_RC_SUCCESS; +} + +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, + struct tipc_portid id) { + HeaderMessage header; + header.Decode(buffer); + // if mds support flow control + if ((header.pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { + if (header.pro_id_ == MDS_PROT_FCTRL_ID) { + if (header.msg_type_ == ChunkAck::kChunkAckMsgType) { + // receive single ack message + ChunkAck ack; + ack.Decode(buffer); + // send to the event thread + if (m_NCS_IPC_SEND(&mbx_events, + new Event(Event::Type::kEvtRcvChunkAck, id, ack.svc_id_, + header.mseq_, header.mfrag_, ack.acked_fseq_, ack.chunk_size_), + NCS_IPC_PRIORITY_HIGH) != NCSCC_RC_SUCCESS) { + m_MDS_LOG_ERR("FCTRL: Failed to send msg to mbx_events\n"); + } + // return NCSCC_RC_FAILURE, so the tipc receiving thread (legacy) will + // skip this data msg + return NCSCC_RC_FAILURE; + } + } else { + // receive data message + DataMessage data; + data.Decode(buffer); + // todo: skip mcast/bcast, revisit + if ((data.snd_type_ == MDS_SENDTYPE_BCAST || + data.snd_type_ == MDS_SENDTYPE_RBCAST) && is_mcast_enabled) { + return NCSCC_RC_SUCCESS; + } + portid_map_mutex.lock(); + uint32_t rc = process_flow_event(Event(Event::Type::kEvtRcvData, + id, data.svc_id_, header.mseq_, header.mfrag_, header.fseq_)); + portid_map_mutex.unlock(); + return rc; + } + } + return NCSCC_RC_SUCCESS; +} diff --git a/src/mds/mds_tipc_fctrl_intf.h b/src/mds/mds_tipc_fctrl_intf.h new file mode 100644 index 0000000..85a058f --- /dev/null +++ b/src/mds/mds_tipc_fctrl_intf.h @@ -0,0 +1,47 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2019 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#ifndef MDS_MDS_TIPC_FCTRL_INTF_H_ +#define MDS_MDS_TIPC_FCTRL_INTF_H_ + +#include <linux/tipc.h> +#include <stdbool.h> +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +uint32_t mds_tipc_fctrl_initialize(int dgramsock, struct tipc_portid id, + uint64_t rcv_buf_size, bool mbrcast_enabled); +uint32_t mds_tipc_fctrl_shutdown(void); +uint32_t mds_tipc_fctrl_rcv_data(uint8_t *buffer, uint16_t len, + struct tipc_portid id); +uint32_t mds_tipc_fctrl_portid_up(struct tipc_portid id, uint32_t type); +uint32_t mds_tipc_fctrl_portid_down(struct tipc_portid id, uint32_t type); +uint32_t mds_tipc_fctrl_portid_terminate(struct tipc_portid id); +uint32_t mds_tipc_fctrl_drop_data(uint8_t *buffer, uint16_t len, + struct tipc_portid id); +uint32_t mds_tipc_fctrl_sndqueue_capable(struct tipc_portid id, uint16_t len, + uint16_t* next_seq); +uint32_t mds_tipc_fctrl_trysend(const uint8_t *buffer, uint16_t len, + struct tipc_portid id); +#ifdef __cplusplus +} +#endif + +#endif // MDS_MDS_TIPC_FCTRL_INTF_H_ diff --git a/src/mds/mds_tipc_fctrl_msg.cc b/src/mds/mds_tipc_fctrl_msg.cc new file mode 100644 index 0000000..abd38d3 --- /dev/null +++ b/src/mds/mds_tipc_fctrl_msg.cc @@ -0,0 +1,142 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2019 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#include "mds/mds_tipc_fctrl_msg.h" +#include "base/ncssysf_def.h" + +namespace mds { +HeaderMessage::HeaderMessage(uint16_t msg_len, uint32_t mseq, + uint16_t mfrag, uint16_t fseq): msg_len_(msg_len), + mseq_(mseq), mfrag_(mfrag), fseq_(fseq) { + pro_ver_ = MDS_PROT_FCTRL; + pro_id_ = 0; + msg_type_ = 0; +} + +void HeaderMessage::Encode(uint8_t *msg) { + uint8_t *ptr; + + // encode message length + ptr = &msg[0]; + ncs_encode_16bit(&ptr, msg_len_); + // encode sequence number + ptr = &msg[2]; + ncs_encode_32bit(&ptr, mseq_); + // encode sequence number + ptr = &msg[6]; + ncs_encode_16bit(&ptr, mfrag_); + // skip length_check: oct8&9 + // encode protocol version + ptr = &msg[10]; + ncs_encode_8bit(&ptr, MDS_PROT_FCTRL); +} + +void HeaderMessage::Decode(uint8_t *msg) { + uint8_t *ptr; + + // decode message length + ptr = &msg[0]; + msg_len_ = ncs_decode_16bit(&ptr); + // decode sequence number + ptr = &msg[2]; + mseq_ = ncs_decode_32bit(&ptr); + // decode fragment number + ptr = &msg[6]; + mfrag_ = ncs_decode_16bit(&ptr); + // decode protocol version + ptr = &msg[10]; + pro_ver_ = ncs_decode_8bit(&ptr); + if ((pro_ver_ & MDS_PROT_VER_MASK) == MDS_PROT_FCTRL) { + // decode flow control sequence number + ptr = &msg[8]; + fseq_ = ncs_decode_16bit(&ptr); + // decode protocol identifier + ptr = &msg[11]; + pro_id_ = ncs_decode_32bit(&ptr); + if (pro_id_ == MDS_PROT_FCTRL_ID) { + // decode message type + ptr = &msg[15]; + msg_type_ = ncs_decode_8bit(&ptr); + } + } else { + if (mfrag_ != 0) { + ptr = &msg[8]; + fseq_ = ncs_decode_16bit(&ptr); + if (fseq_ != 0) pro_ver_ = MDS_PROT_FCTRL; + } + } +} + +void DataMessage::Decode(uint8_t *msg) { + uint8_t *ptr; + + // decode service id + ptr = &msg[MDTM_PKT_TYPE_OFFSET + + MDTM_FRAG_HDR_LEN + + MDS_HEADER_RCVR_SVC_ID_POSITION]; + svc_id_ = ncs_decode_16bit(&ptr); + // decode snd_type + ptr = &msg[17]; + snd_type_ = (ncs_decode_8bit(&ptr)) & 0x3f; +} + +DataMessage::~DataMessage() { + if (msg_data_ != nullptr) { + delete msg_data_; + msg_data_ = nullptr; + } +} + +ChunkAck::ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size): + svc_id_(svc_id), acked_fseq_(fseq), chunk_size_(chunk_size) { + msg_type_ = kChunkAckMsgType; +} + +void ChunkAck::Encode(uint8_t *msg) { + uint8_t *ptr; + // encode protocol identifier + ptr = &msg[11]; + ncs_encode_32bit(&ptr, MDS_PROT_FCTRL_ID); + // encode message type + ptr = &msg[15]; + ncs_encode_8bit(&ptr, kChunkAckMsgType); + // encode service id + ptr = &msg[16]; + ncs_encode_16bit(&ptr, svc_id_); + // encode flow control sequence number + ptr = &msg[18]; + ncs_encode_16bit(&ptr, acked_fseq_); + // encode chunk size + ptr = &msg[20]; + ncs_encode_16bit(&ptr, chunk_size_); +} + +void ChunkAck::Decode(uint8_t *msg) { + uint8_t *ptr; + + // decode service id + ptr = &msg[16]; + svc_id_ = ncs_decode_16bit(&ptr); + // decode flow control sequence number + ptr = &msg[18]; + acked_fseq_ = ncs_decode_16bit(&ptr); + // decode chunk size + ptr = &msg[20]; + chunk_size_ = ncs_decode_16bit(&ptr); +} + +} // end namespace mds diff --git a/src/mds/mds_tipc_fctrl_msg.h b/src/mds/mds_tipc_fctrl_msg.h new file mode 100644 index 0000000..677f256 --- /dev/null +++ b/src/mds/mds_tipc_fctrl_msg.h @@ -0,0 +1,129 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2019 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#ifndef MDS_MDS_TIPC_FCTRL_MSG_H_ +#define MDS_MDS_TIPC_FCTRL_MSG_H_ + +#include <linux/tipc.h> +#include "base/ncssysf_ipc.h" +#include "base/ncssysf_tmr.h" +#include "mds/mds_dt.h" + +namespace mds { + +class Event { + public: + enum class Type { + kEvtPortIdAll = 0, + kEvtPortIdUp, // event of new portid published (not used) + kEvtPortIdDown, // event of portid widthdrawn (not used) + + kEvtDataFlowAll, + kEvtRcvData, // event that received data msg from peer + kEvtSendChunkAck, // event to send the ack for a chunk of N accumulative + // data msgs (N>=1) + kEvtRcvChunkAck, // event that received the ack for a chunk of N + // accumulative data msgs (N>=1) + kEvtSendSelectiveAck, // event to send the ack for a number of selective + // data msgs (not supported) + kEvtRcvSelectiveAck, // event that received the ack for a numer of + // selective data msgs (not supported) + kEvtDropData, // event reported from tipc that a message is not + // delivered + }; + NCS_IPC_MSG next_{0}; + Type type_; + + // Used for flow event only + struct tipc_portid id_; + uint16_t svc_id_{0}; + uint32_t mseq_{0}; + uint16_t mfrag_{0}; + uint16_t fseq_{0}; + uint16_t chunk_size_{1}; + explicit Event(Type type):type_(type) {} + Event(Type type, struct tipc_portid id, uint16_t svc_id, + uint32_t mseq, uint16_t mfrag, uint16_t f_seg_num): + id_(id), svc_id_(svc_id), + mseq_(mseq), mfrag_(mfrag), fseq_(f_seg_num) { + type_ = type; + } + Event(Type type, struct tipc_portid id, uint16_t svc_id, uint32_t mseq, + uint16_t mfrag, uint16_t f_seg_num, uint16_t chunk_size): + id_(id), svc_id_(svc_id), mseq_(mseq), mfrag_(mfrag), + fseq_(f_seg_num), chunk_size_(chunk_size) { + type_ = type; + } +}; + +class BaseMessage { + public: + virtual ~BaseMessage() {} + virtual void Decode(uint8_t *msg) {} + virtual void Encode(uint8_t *msg) {} +}; + +class HeaderMessage: public BaseMessage { + public: + uint8_t* msg_ptr_{nullptr}; + uint16_t msg_len_{0}; + uint32_t mseq_{0}; + uint16_t mfrag_{0}; + uint16_t fseq_{0}; + uint8_t pro_ver_{0}; + uint32_t pro_id_{0}; + uint16_t msg_type_{0}; + HeaderMessage() {} + HeaderMessage(uint16_t msg_len, uint32_t mseq, uint16_t mfrag, + uint16_t fseq); + virtual ~HeaderMessage() {} + void Decode(uint8_t *msg) override; + void Encode(uint8_t *msg) override; +}; + +class DataMessage: public BaseMessage { + public: + HeaderMessage header_; + uint16_t svc_id_{0}; + + uint8_t* msg_data_{nullptr}; + uint8_t snd_type_{0}; + + DataMessage() {} + virtual ~DataMessage(); + void Decode(uint8_t *msg) override; +}; + +class ChunkAck: public BaseMessage { + public: + static const uint8_t kChunkAckMsgType = 1; + static const uint16_t kChunkAckMsgLength = 22; + + uint8_t msg_type_{0}; + uint16_t svc_id_{0}; + uint16_t acked_fseq_{0}; + uint16_t chunk_size_{1}; + ChunkAck() {} + ChunkAck(uint16_t svc_id, uint16_t fseq, uint16_t chunk_size); + virtual ~ChunkAck() {} + void Encode(uint8_t *msg) override; + void Decode(uint8_t *msg) override; +}; + +} // end namespace mds + +#endif // MDS_MDS_TIPC_FCTRL_MSG_H_ diff --git a/src/mds/mds_tipc_fctrl_portid.cc b/src/mds/mds_tipc_fctrl_portid.cc new file mode 100644 index 0000000..24d13ee --- /dev/null +++ b/src/mds/mds_tipc_fctrl_portid.cc @@ -0,0 +1,261 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2019 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#include "mds/mds_tipc_fctrl_portid.h" +#include "base/ncssysf_def.h" + +#include "mds/mds_dt.h" +#include "mds/mds_log.h" + +namespace mds { + +void MessageQueue::Queue(DataMessage* msg) { + queue_.push_back(msg); +} + +DataMessage* MessageQueue::Find(uint32_t mseq, uint16_t mfrag) { + for (auto it = queue_.begin(); it != queue_.end(); ++it) { + DataMessage *m = *it; + if (m->header_.mseq_ == mseq && m->header_.mfrag_ == mfrag) { + return m; + } + } + return nullptr; +} + +uint64_t MessageQueue::Erase(uint16_t fseq_from, uint16_t fseq_to) { + uint64_t msg_len = 0; + for (auto it = queue_.begin(); it != queue_.end();) { + DataMessage *m = *it; + if (fseq_from <= m->header_.fseq_ && + m->header_.fseq_ <= fseq_to) { + msg_len += m->header_.msg_len_; + it = queue_.erase(it); + delete m; + } else { + it++; + } + } + return msg_len; +} + +void MessageQueue::Clear() { + while (queue_.empty() == false) { + DataMessage* msg = queue_.front(); + queue_.pop_front(); + delete msg; + } +} + +TipcPortId::TipcPortId(struct tipc_portid id, int sock, uint16_t chksize, + uint64_t sock_buf_size): + id_(id), bsrsock_(sock), chunk_size_(chksize), rcv_buf_size_(sock_buf_size) { +} + +TipcPortId::~TipcPortId() { + // clear all msg in sndqueue_ + sndqueue_.Clear(); +} + +uint64_t TipcPortId::GetUniqueId(struct tipc_portid id) { + // this uid is equivalent to the mds adest + uint64_t uid = ((uint64_t)id.node << 32) | (uint64_t)id.ref; + return uid; +} + +uint32_t TipcPortId::Send(uint8_t* data, uint16_t length) { + struct sockaddr_tipc server_addr; + ssize_t send_len = 0; + uint32_t rc = NCSCC_RC_SUCCESS; + + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.family = AF_TIPC; + server_addr.addrtype = TIPC_ADDR_ID; + server_addr.addr.id = id_; + send_len = sendto(bsrsock_, data, length, 0, + (struct sockaddr *)&server_addr, sizeof(server_addr)); + + if (send_len == length) { + rc = NCSCC_RC_SUCCESS; + } else { + m_MDS_LOG_ERR("sendto() err :%s", strerror(errno)); + rc = NCSCC_RC_FAILURE; + } + return rc; +} + +uint32_t TipcPortId::Queue(const uint8_t* data, uint16_t length) { + uint32_t rc = NCSCC_RC_SUCCESS; + + DataMessage *msg = new DataMessage; + msg->header_.Decode(const_cast<uint8_t*>(data)); + msg->Decode(const_cast<uint8_t*>(data)); + msg->msg_data_ = new uint8_t[length]; + memcpy(msg->msg_data_, data, length); + sndqueue_.Queue(msg); + ++sndwnd_.send_; + sndwnd_.nacked_space_ += length; + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + "SndData[mseq:%u, mfrag:%u, fseq:%u, len:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + msg->header_.mseq_, msg->header_.mfrag_, msg->header_.fseq_, length, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + + return rc; +} + +bool TipcPortId::ReceiveCapable(uint16_t sending_len) { + return true; +} + +void TipcPortId::SendChunkAck(uint16_t fseq, uint16_t svc_id, + uint16_t chksize) { + uint8_t data[ChunkAck::kChunkAckMsgLength]; + + HeaderMessage header(ChunkAck::kChunkAckMsgLength, 0, 0, fseq); + header.Encode(reinterpret_cast<uint8_t*>(&data)); + + ChunkAck sack(svc_id, fseq, chksize); + sack.Encode(reinterpret_cast<uint8_t*>(&data)); + Send(reinterpret_cast<uint8_t*>(&data), ChunkAck::kChunkAckMsgLength); + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + "SndChkAck[fseq:%u, chunk:%u]", + id_.node, id_.ref, + fseq, chksize); +} + +uint32_t TipcPortId::ReceiveData(uint32_t mseq, uint16_t mfrag, + uint16_t fseq, uint16_t svc_id) { + uint32_t rc = NCSCC_RC_SUCCESS; + // update receiver sequence window + if (rcvwnd_.acked_ < fseq && rcvwnd_.rcv_ + 1 == fseq) { + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + mseq, mfrag, fseq, + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + + ++rcvwnd_.rcv_; + if (rcvwnd_.rcv_ - rcvwnd_.acked_ >= chunk_size_) { + // send ack for @chunk_size_ msgs starting from fseq + SendChunkAck(fseq, svc_id, chunk_size_); + rcvwnd_.acked_ = rcvwnd_.rcv_; + } + } else { + // todo: update rcvwnd_.nacked_space_. + // This nacked_space_ will tell the number of bytes that has not been acked + // to the sender. If this nacked_space_ is growing large, and approaching + // the socket buffer size, the transmission of sender may be queued. + // this nacked_space_ can be used to detect if the kChunkAckTimeout or + // kChunkAckSize are set excessively large. + // It is not used for now, so ignore it. + + // check for transmission error + if (rcvwnd_.rcv_ + 1 < fseq) { + if (rcvwnd_.rcv_ == 0 && rcvwnd_.acked_ == 0) { + // peer does not realize that this portid reset + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " + "Warning[portid reset]", + id_.node, id_.ref, + mseq, mfrag, fseq, + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + + rcvwnd_.rcv_ = fseq; + } else { + rc = NCSCC_RC_FAILURE; + // msg loss + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " + "Error[msg loss]", + id_.node, id_.ref, + mseq, mfrag, fseq, + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + } + } + if (fseq <= rcvwnd_.acked_) { + rc = NCSCC_RC_FAILURE; + // unexpected retransmission + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvData[mseq:%u, mfrag:%u, fseq:%u], " + "rcvwnd[acked:%u, rcv:%u, nacked:%" PRIu64 "], " + "Error[unexpected retransmission]", + id_.node, id_.ref, + mseq, mfrag, fseq, + rcvwnd_.acked_, rcvwnd_.rcv_, rcvwnd_.nacked_space_); + } + } + return rc; +} + +void TipcPortId::ReceiveChunkAck(uint16_t fseq, uint16_t chksize) { + // update sender sequence window + if (sndwnd_.acked_ < fseq && fseq < sndwnd_.send_) { + m_MDS_LOG_DBG("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvChkAck[fseq:%u, chunk:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " + "queue[size:%" PRIu64 "]", + id_.node, id_.ref, + fseq, chksize, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, + sndqueue_.Size()); + + // fast forward the sndwnd_.acked_ sequence to fseq + sndwnd_.acked_ = fseq; + + // remove a number @chksize messages out of sndqueue_ and decrease + // the nacked_space_ of sender + sndwnd_.nacked_space_ -= sndqueue_.Erase(fseq - chksize + 1, fseq); + } else { + m_MDS_LOG_ERR("FCTRL: [me] <-- [node:%x, ref:%u], " + "RcvChkAck[fseq:%u, chunk:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "], " + "queue[size:%" PRIu64 "], " + "Error[msg disordered]", + id_.node, id_.ref, + fseq, chksize, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_, + sndqueue_.Size()); + } +} + +void TipcPortId::ReceiveNack(uint32_t mseq, uint16_t mfrag, + uint16_t fseq) { + DataMessage* msg = sndqueue_.Find(mseq, mfrag); + if (msg != nullptr) { + // Resend the msg found + Send(msg->msg_data_, msg->header_.msg_len_); + m_MDS_LOG_DBG("FCTRL: [me] --> [node:%x, ref:%u], " + "RsndData[mseq:%u, mfrag:%u, fseq:%u], " + "sndwnd[acked:%u, send:%u, nacked:%" PRIu64 "]", + id_.node, id_.ref, + mseq, mfrag, fseq, + sndwnd_.acked_, sndwnd_.send_, sndwnd_.nacked_space_); + } else { + m_MDS_LOG_ERR("FCTRL: [me] --> [node:%x, ref:%u], " + "RsndData[mseq:%u, mfrag:%u, fseq:%u], " + "Error[msg not found]", + id_.node, id_.ref, + mseq, mfrag, fseq); + } +} + +} // end namespace mds diff --git a/src/mds/mds_tipc_fctrl_portid.h b/src/mds/mds_tipc_fctrl_portid.h new file mode 100644 index 0000000..8068e6e --- /dev/null +++ b/src/mds/mds_tipc_fctrl_portid.h @@ -0,0 +1,87 @@ +/* -*- OpenSAF -*- + * + * (C) Copyright 2019 The OpenSAF Foundation + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. This file and program are licensed + * under the GNU Lesser General Public License Version 2.1, February 1999. + * The complete license can be accessed from the following location: + * http://opensource.org/licenses/lgpl-license.php + * See the Copying file included with the OpenSAF distribution for full + * licensing terms. + * + * Author(s): Ericsson AB + * + */ + +#ifndef MDS_MDS_TIPC_FCTRL_PORTID_H_ +#define MDS_MDS_TIPC_FCTRL_PORTID_H_ + +#include <linux/tipc.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdio.h> +#include <unistd.h> +#include <deque> +#include "mds/mds_tipc_fctrl_msg.h" + +namespace mds { + +class MessageQueue { + public: + void Queue(DataMessage* msg); + DataMessage* Find(uint32_t mseq, uint16_t mfrag); + uint64_t Erase(uint16_t fseq_from, uint16_t fseq_to); + uint64_t Size() const { return queue_.size(); } + void Clear(); + private: + std::deque<DataMessage*> queue_; +}; + +class TipcPortId { + public: + TipcPortId(struct tipc_portid id, int sock, uint16_t chunk_size, + uint64_t sock_buf_size); + ~TipcPortId(); + static uint64_t GetUniqueId(struct tipc_portid id); + int GetSock() const { return bsrsock_; } + uint16_t GetCurrentSeq() { return sndwnd_.send_; } + bool ReceiveCapable(uint16_t sending_len); + void ReceiveChunkAck(uint16_t fseq, uint16_t chunk_size); + void SendChunkAck(uint16_t fseq, uint16_t svc_id, uint16_t chunk_size); + uint32_t ReceiveData(uint32_t mseq, uint16_t mfrag, + uint16_t fseq, uint16_t svc_id); + void ReceiveNack(uint32_t mseq, uint16_t mfrag, uint16_t fseq); + uint32_t Send(uint8_t* data, uint16_t length); + uint32_t Queue(const uint8_t* data, uint16_t length); + + uint16_t svc_cnt_{1}; // number of service subscribed on this portid + + private: + struct tipc_portid id_; + int bsrsock_; // tipc socket to send/receive data per tipc_portid + uint16_t chunk_size_{5}; + uint64_t rcv_buf_size_{0}; // estimated buffer size at receiver + + struct sndwnd { + // sender sequence window + uint16_t acked_{0}; // last sequence has been acked by receiver + uint16_t send_{1}; // next sequence to be sent + uint64_t nacked_space_{0}; // total bytes are sent but not acked + }; + struct sndwnd sndwnd_; + + struct rcvwnd { + // receiver sequence window + uint16_t acked_{0}; // last sequence has been acked to sender + uint16_t rcv_{0}; // last sequence has been received + uint64_t nacked_space_{0}; // total bytes has not been acked + }; + struct rcvwnd rcvwnd_; + + MessageQueue sndqueue_; +}; + +} // end namespace mds +#endif // MDS_MDS_TIPC_FCTRL_PORTID_H_ -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel