osaf/libs/core/mds/mds_dt_tipc.c | 374 +++++++++++++++++++++----------------- 1 files changed, 203 insertions(+), 171 deletions(-)
This first part changes the socket descriptors to non blocking at receive. This to read all incoming messages before returning to the next poll call. diff --git a/osaf/libs/core/mds/mds_dt_tipc.c b/osaf/libs/core/mds/mds_dt_tipc.c --- a/osaf/libs/core/mds/mds_dt_tipc.c +++ b/osaf/libs/core/mds/mds_dt_tipc.c @@ -565,15 +565,13 @@ static uint32_t mdtm_destroy_rcv_task(vo ssize_t recvfrom_connectionless (int sd, void *buf, size_t nbytes, int flags, struct sockaddr *from, socklen_t *addrlen) { - struct msghdr msg; - struct iovec iov; + struct msghdr msg = {0}; + struct iovec iov = {0}; char anc_buf[CMSG_SPACE(8) + CMSG_SPACE(1024) + CMSG_SPACE(12)]; struct cmsghdr *anc; int has_addr; int anc_data[2]; - ssize_t sz; - has_addr = (from != NULL) && (addrlen != NULL); iov.iov_base = buf; @@ -586,55 +584,63 @@ ssize_t recvfrom_connectionless (int sd, msg.msg_control = anc_buf; msg.msg_controllen = sizeof(anc_buf); - sz = recvmsg(sd, &msg, flags); - if (sz >= 0) { - anc = CMSG_FIRSTHDR(&msg); - if (anc == NULL) { - m_MDS_LOG_DBG("MDTM: size: %d anc is NULL", (int)sz); + while (true) { + ssize_t sz = recvmsg(sd, &msg, flags); + if (sz == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return sz; + } else if (errno == EINTR) { + continue; + } else { + /* -1 indicates connection termination connectionless this not possible */ + abort(); + } } - while (anc != NULL) { - - /* Receipt of a normal data message never creates the TIPC_ERRINFO - and TIPC_RETDATA objects, and only creates the TIPC_DESTNAME object - if the message was sent using a TIPC name or name sequence as the - destination rather than a TIPC port ID*/ - if (anc->cmsg_type == TIPC_ERRINFO) { - anc_data[0] = *((unsigned int*)(CMSG_DATA(anc) + 0)); - if (anc_data[0] == TIPC_ERR_OVERLOAD) { - LOG_CR("MDTM: undelivered message condition ancillary data: TIPC_ERR_OVERLOAD"); - m_MDS_LOG_CRITICAL("MDTM: undelivered message condition ancillary data: TIPC_ERR_OVERLOAD"); + if (sz >= 0) { + anc = CMSG_FIRSTHDR(&msg); + if (anc == NULL) { + m_MDS_LOG_DBG("MDTM: size: %d anc is NULL", (int)sz); + } + while (anc != NULL) { + + /* Receipt of a normal data message never creates the TIPC_ERRINFO + and TIPC_RETDATA objects, and only creates the TIPC_DESTNAME object + if the message was sent using a TIPC name or name sequence as the + destination rather than a TIPC port ID*/ + if (anc->cmsg_type == TIPC_ERRINFO) { + anc_data[0] = *((unsigned int*)(CMSG_DATA(anc) + 0)); + if (anc_data[0] == TIPC_ERR_OVERLOAD) { + LOG_CR("MDTM: undelivered message condition ancillary data: TIPC_ERR_OVERLOAD"); + m_MDS_LOG_CRITICAL("MDTM: undelivered message condition ancillary data: TIPC_ERR_OVERLOAD"); + } else { + /* TIPC_ERRINFO - TIPC error code associated with a returned data message or a connection termination message */ + m_MDS_LOG_DBG("MDTM: undelivered message condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]); + } + } else if (anc->cmsg_type == TIPC_RETDATA) { + /* If we set TIPC_DEST_DROPPABLE off message (configure TIPC to return rejected messages to the sender ) + we will hit this when we implement MDS retransmit lost messages, can be replaced with flow control logic */ + /* TIPC_RETDATA -The contents of a returned data message */ + LOG_CR("MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); + m_MDS_LOG_CRITICAL("MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); + } else if (anc->cmsg_type == TIPC_DESTNAME) { + if (sz == 0) { + m_MDS_LOG_DBG("MDTM: recd bytes=0 on received on sock, abnormal/unknown condition. Ignoring"); + } } else { - /* TIPC_ERRINFO - TIPC error code associated with a returned data message or a connection termination message */ - m_MDS_LOG_DBG("MDTM: undelivered message condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]); + m_MDS_LOG_INFO("MDTM: unrecognized ancillary data type %u\n", anc->cmsg_type); + if (sz == 0) { + m_MDS_LOG_DBG("MDTM: recd bytes=0 on received on sock, abnormal/unkown condition. Ignoring"); + } } - } else if (anc->cmsg_type == TIPC_RETDATA) { - /* If we set TIPC_DEST_DROPPABLE off message (configure TIPC to return rejected messages to the sender ) - we will hit this when we implement MDS retransmit lost messages, can be replaced with flow control logic */ - /* TIPC_RETDATA -The contents of a returned data message */ - LOG_CR("MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); - m_MDS_LOG_CRITICAL("MDTM: undelivered message condition ancillary data: TIPC_RETDATA"); - } else if (anc->cmsg_type == TIPC_DESTNAME) { - if (sz == 0) { - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received on sock, abnormal/unknown condition. Ignoring"); - } - } else { - m_MDS_LOG_INFO("MDTM: unrecognized ancillary data type %u\n", anc->cmsg_type); - if (sz == 0) { - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received on sock, abnormal/unkown condition. Ignoring"); - } + + anc = CMSG_NXTHDR(&msg, anc); } - anc = CMSG_NXTHDR(&msg, anc); + if (has_addr) + *addrlen = msg.msg_namelen; } - - if (has_addr) - *addrlen = msg.msg_namelen; - } else { - /* -1 indicates connection termination connectionless this not possible */ - abort(); + return sz; } - - return sz; } /********************************************************* @@ -651,91 +657,112 @@ ssize_t recvfrom_connectionless (int sd, *********************************************************/ static uint32_t mdtm_process_recv_events(void) { + enum { + FD_DSOCK = 0, + FD_BSRSOCK, + FD_TMRFD, + NUM_FDS + }; + /* STEP 1: Poll on the BSRsock and Dsock to get the events if data is received process the received data if discovery events are received , process the discovery events */ - while (1) { + while (true) { unsigned int pollres; - struct pollfd pfd[3]; + struct pollfd pfd[NUM_FDS] = {{0}}; struct tipc_event event; - pfd[0].fd = tipc_cb.Dsock; - pfd[0].events = POLLIN; - pfd[1].fd = tipc_cb.BSRsock; - pfd[1].events = POLLIN; - pfd[2].fd = tipc_cb.tmr_fd; - pfd[2].events = POLLIN; - - pfd[0].revents = pfd[1].revents = pfd[2].revents = 0; - - pollres = poll(pfd, 3, MDTM_TIPC_POLL_TIMEOUT); + pfd[FD_DSOCK].fd = tipc_cb.Dsock; + pfd[FD_DSOCK].events = POLLIN; + pfd[FD_BSRSOCK].fd = tipc_cb.BSRsock; + pfd[FD_BSRSOCK].events = POLLIN; + pfd[FD_TMRFD].fd = tipc_cb.tmr_fd; + pfd[FD_TMRFD].events = POLLIN; + + pfd[FD_DSOCK].revents = pfd[FD_BSRSOCK].revents = pfd[FD_TMRFD].revents = 0; + + pollres = poll(pfd, NUM_FDS, MDTM_TIPC_POLL_TIMEOUT); if (pollres > 0) { /* Check for EINTR and discard */ memset(&event, 0, sizeof(event)); osaf_mutex_lock_ordie(&gl_mds_library_mutex); - if (pfd[0].revents == POLLIN) { - if (recv(tipc_cb.Dsock, &event, sizeof(event), 0) != sizeof(event)) { - m_MDS_LOG_ERR("Unable to capture the recd event .. Continuing err :%s", strerror(errno)); - osaf_mutex_unlock_ordie(&gl_mds_library_mutex); - continue; - } else { - if (NTOHL(event.event) == TIPC_PUBLISHED) { - - m_MDS_LOG_INFO("MDTM: Published: "); - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", - NTOHL(event.s.seq.type), NTOHL(event.found_lower), - NTOHL(event.found_upper), NTOHL(event.port.node), - NTOHL(event.port.ref)); - - if ( NCSCC_RC_SUCCESS != mdtm_process_discovery_events(TIPC_PUBLISHED, - event)) { - m_MDS_LOG_INFO("MDTM: Published Event processing status: F"); + if (pfd[FD_DSOCK].revents == POLLIN) { + while(true) { + ssize_t rc = recv(tipc_cb.Dsock, &event, sizeof(event), MSG_DONTWAIT); + if (rc == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } else if (errno == EINTR) { + continue; + } else { + m_MDS_LOG_ERR("Unable to capture the recd event .. Continuing err :%s", strerror(errno)); + break; } - } else if (NTOHL(event.event) == TIPC_WITHDRAWN) { - m_MDS_LOG_INFO("MDTM: Withdrawn: "); - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", - NTOHL(event.s.seq.type), NTOHL(event.found_lower), - NTOHL(event.found_upper), NTOHL(event.port.node), - NTOHL(event.port.ref)); - - if ( NCSCC_RC_SUCCESS != mdtm_process_discovery_events(TIPC_WITHDRAWN, - event)) { - m_MDS_LOG_INFO("MDTM: Withdrawn event processing status: F"); + } else { + if (rc != sizeof(event)) { + m_MDS_LOG_ERR("Unable to capture the recd event .. Continuing err :%s", strerror(errno)); + break; + } else { + if (NTOHL(event.event) == TIPC_PUBLISHED) { + + m_MDS_LOG_INFO("MDTM: Published: "); + m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", + NTOHL(event.s.seq.type), NTOHL(event.found_lower), + NTOHL(event.found_upper), NTOHL(event.port.node), + NTOHL(event.port.ref)); + + if ( NCSCC_RC_SUCCESS != mdtm_process_discovery_events(TIPC_PUBLISHED, + event)) { + m_MDS_LOG_INFO("MDTM: Published Event processing status: F"); + } + } else if (NTOHL(event.event) == TIPC_WITHDRAWN) { + m_MDS_LOG_INFO("MDTM: Withdrawn: "); + m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", + NTOHL(event.s.seq.type), NTOHL(event.found_lower), + NTOHL(event.found_upper), NTOHL(event.port.node), + NTOHL(event.port.ref)); + + if ( NCSCC_RC_SUCCESS != mdtm_process_discovery_events(TIPC_WITHDRAWN, + event)) { + m_MDS_LOG_INFO("MDTM: Withdrawn event processing status: F"); + } + } else if (NTOHL(event.event) == TIPC_SUBSCR_TIMEOUT) { + /* As the timeout passed in infinite, No need to check for the Timeout */ + m_MDS_LOG_ERR("MDTM: Timeou Event"); + m_MDS_LOG_INFO("MDTM: Timeou Event"); + m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", + NTOHL(event.s.seq.type), NTOHL(event.found_lower), + NTOHL(event.found_upper), NTOHL(event.port.node), + NTOHL(event.port.ref)); + } else { + m_MDS_LOG_ERR("MDTM: Unknown Event"); + /* This should never come */ + assert(0); + } } - } else if (NTOHL(event.event) == TIPC_SUBSCR_TIMEOUT) { - /* As the timeout passed in infinite, No need to check for the Timeout */ - m_MDS_LOG_ERR("MDTM: Timeou Event"); - m_MDS_LOG_INFO("MDTM: Timeou Event"); - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id <0x%08x:%u>\n", - NTOHL(event.s.seq.type), NTOHL(event.found_lower), - NTOHL(event.found_upper), NTOHL(event.port.node), - NTOHL(event.port.ref)); - } else { - m_MDS_LOG_ERR("MDTM: Unknown Event"); - /* This should never come */ - assert(0); } } - } else if (pfd[0].revents & POLLHUP) { + } + if (pfd[FD_DSOCK].revents & POLLHUP) { /* This value is returned when the number of subscriptions made cross the tipc max_subscr limit, so no more connection to the tipc topserver is present(viz no more up/down events), so abort and exit the process */ - m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on Discovery Socket, No. of subscriptions=%d", - num_subscriptions); + m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on Discovery Socket, No. of subscriptions=%d", + num_subscriptions); abort(); /* This means, the process is use less as it has lost the connectivity with the topology server and will not be able to receive any UP/DOWN events */ - } else if (pfd[1].revents & POLLIN) { + } + if (pfd[FD_BSRSOCK].revents & POLLIN) { /* Data Received */ uint8_t *inbuf = tipc_cb.recvbuf; uint8_t *data; /* Used for decoding */ - uint16_t recd_bytes = 0; #ifdef MDS_CHECKSUM_ENABLE_FLAG uint16_t old_checksum = 0; uint16_t new_checksum = 0; @@ -743,78 +770,83 @@ static uint32_t mdtm_process_recv_events struct sockaddr_tipc client_addr; socklen_t alen = sizeof(client_addr); - uint16_t recd_buf_len = 0; m_MDS_LOG_INFO("MDTM: Data received: Processing data "); - recd_bytes = recvfrom_connectionless(tipc_cb.BSRsock, inbuf, TIPC_MAX_USER_MSG_SIZE, 0, - (struct sockaddr *)&client_addr, &alen); - if (recd_bytes == 0) { - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received on sock, abnormal/unknown/hack condition. Ignoring"); - osaf_mutex_unlock_ordie(&gl_mds_library_mutex); - continue; + while (true) { + uint16_t recd_buf_len = 0; + ssize_t recd_bytes = recvfrom_connectionless(tipc_cb.BSRsock, inbuf, TIPC_MAX_USER_MSG_SIZE, MSG_DONTWAIT, + (struct sockaddr *)&client_addr, &alen); + if (recd_bytes == -1) { + m_MDS_LOG_DBG("MDTM: no more data to read"); + break; + } + if (recd_bytes == 0) { + m_MDS_LOG_DBG("MDTM: recd bytes=0 on received on sock, abnormal/unknown/hack condition. Ignoring"); + break; + } + data = inbuf; + + recd_buf_len = ncs_decode_16bit(&data); + + if (pfd[FD_BSRSOCK].revents & POLLERR) { + m_MDS_LOG_ERR("MDTM: Error Recd:tipc_id=<0x%08x:%u>:errno=0x%08x", + client_addr.addr.id.node, client_addr.addr.id.ref, errno); + } else if (recd_buf_len == recd_bytes) { + uint64_t tipc_id = 0; + uint32_t buff_dump = 0; + tipc_id = ((uint64_t)client_addr.addr.id.node) << 32; /* TIPC_ID=<NODE,REF> */ + tipc_id |= client_addr.addr.id.ref; + +#ifdef MDS_CHECKSUM_ENABLE_FLAG + if (inbuf[2] == 1) { + old_checksum = ((uint16_t)inbuf[3] << 8 | inbuf[4]); + inbuf[3] = 0; + inbuf[4] = 0; + new_checksum = mds_checksum(recd_bytes, inbuf); + + if (old_checksum != new_checksum) { + m_MDS_LOG_ERR + ("CHECKSUM-MISMATCH:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest = <%08x,%u>", + recd_bytes, tipc_id, + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr. + id.node), + client_addr.addr.id.ref); + mds_buff_dump(inbuf, recd_bytes, 100); + abort(); + break; + } + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id, &buff_dump); + if (buff_dump) { + m_MDS_LOG_ERR + ("RECV_DATA_PROCESS:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest = <%08x,%u>", + recd_bytes, tipc_id, + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr. + id.node), + client_addr.addr.id.ref); + mds_buff_dump(inbuf, recd_bytes, 100); + } + } else { + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id, &buff_dump); + } +#else + mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, &buff_dump); +#endif + } else { + uint64_t tipc_id; + tipc_id = ((uint64_t)client_addr.addr.id.node) << 32; /* TIPC_ID=<NODE,REF> */ + tipc_id |= client_addr.addr.id.ref; + + /* Log message that we are dropping the data */ + m_MDS_LOG_ERR + ("LEN-MISMATCH:recvd_on_sock=%zd, size_in_mds_hdr=%d, Tipc_id= %"PRIu64", Adest = <%08x,%u>", + recd_bytes, recd_buf_len, tipc_id, + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node), + client_addr.addr.id.ref); + mds_buff_dump(inbuf, recd_bytes, 100); + } } - data = inbuf; - - recd_buf_len = ncs_decode_16bit(&data); - - if (pfd[1].revents & POLLERR) { - m_MDS_LOG_ERR("MDTM: Error Recd:tipc_id=<0x%08x:%u>:errno=0x%08x", - client_addr.addr.id.node, client_addr.addr.id.ref, errno); - } else if (recd_buf_len == recd_bytes) { - uint64_t tipc_id = 0; - uint32_t buff_dump = 0; - tipc_id = ((uint64_t)client_addr.addr.id.node) << 32; /* TIPC_ID=<NODE,REF> */ - tipc_id |= client_addr.addr.id.ref; - -#ifdef MDS_CHECKSUM_ENABLE_FLAG - if (inbuf[2] == 1) { - old_checksum = ((uint16_t)inbuf[3] << 8 | inbuf[4]); - inbuf[3] = 0; - inbuf[4] = 0; - new_checksum = mds_checksum(recd_bytes, inbuf); - - if (old_checksum != new_checksum) { - m_MDS_LOG_ERR - ("CHECKSUM-MISMATCH:recvd_on_sock=%d, Tipc_id=0x%llu, Adest = <%08x,%u>", - recd_bytes, tipc_id, - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr. - id.node), - client_addr.addr.id.ref); - mds_buff_dump(inbuf, recd_bytes, 100); - abort(); - osaf_mutex_unlock_ordie(&gl_mds_library_mutex); - continue; - } - mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id, &buff_dump); - if (buff_dump) { - m_MDS_LOG_ERR - ("RECV_DATA_PROCESS:recvd_on_sock=%d, Tipc_id=0x%llu, Adest = <%08x,%u>", - recd_bytes, tipc_id, - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr. - id.node), - client_addr.addr.id.ref); - mds_buff_dump(inbuf, recd_bytes, 100); - } - } else { - mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id, &buff_dump); - } -#else - mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id, &buff_dump); -#endif - } else { - uint64_t tipc_id; - tipc_id = ((uint64_t)client_addr.addr.id.node) << 32; /* TIPC_ID=<NODE,REF> */ - tipc_id |= client_addr.addr.id.ref; - - /* Log message that we are dropping the data */ - m_MDS_LOG_ERR - ("LEN-MISMATCH:recvd_on_sock=%d, size_in_mds_hdr=%d, Tipc_id= %"PRIu64", Adest = <%08x,%u>", - recd_bytes, recd_buf_len, tipc_id, - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node), - client_addr.addr.id.ref); - mds_buff_dump(inbuf, recd_bytes, 100); - } - } else if (pfd[2].revents == POLLIN) { + } + if (pfd[FD_TMRFD].revents == POLLIN) { m_MDS_LOG_INFO("MDTM: Processing Timer mailbox events\n"); /* Check if destroy-event has been processed */ ------------------------------------------------------------------------------ The Command Line: Reinvented for Modern Developers Did the resurgence of CLI tooling catch you by surprise? Reconnect with the command line and become more productive. Learn the new .NET and ASP.NET CLI. Get your free copy! http://sdm.link/telerik _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel