Hi Mahesh,
great, the suggestion below looks good. I will send out a V4 shortly.
/Thanks HansN
On 12/07/2016 05:29 AM, A V Mahesh wrote:
> Hi HansN,
>
> I did some investigation my fist observation is , issues occurring
> because of delayed processing of Control /(FD_DSOCK/ = /POLLIN)/,
> Timer (/FD_TMRFD = //POLLIN ) /and
> Discovery Socket lost the connectivity with the topology events
> (/[FD_DSOCK= //POLLHUP/ ), and this is occurring because of when held
> up in processing of Data message loop of 1024 ,
> so the fix for that will be breaking Data message processing loop if
> we have priority control , timer and connectivity lost events.
>
> So we need to change the key logic of `message counter and break out
> of the receive loop after we have received,
> 1024 messages , even if there are still more messages on the socket`
> with alternate logic. The new logic will also address the out of order
> caused by MDS library as well.
>
> Please do following changes and re-based and send the new version
> patch, so that i will continue testing of further and we can conclude
> this patch.
> If you are busy with some other ticket please let me know i will drive
> the patch
>
> 1) replace this with below :
> /===============================================================================================//
>
>
> //On 11/22/2016 3:07 PM, Hans Nordeback wrote://
> //> + mds_buff_dump(inbuf, recd_bytes, 100);//
> //> + }//
> //> + if (++recv_ctr > MAX_RECV_THRESHOLD) {//
> //> + break;//
> //> + }//
> //===============================================================================================//
>
>
> //
> //===============================================================================================//
>
>
> //+ mds_buff_dump(inbuf, recd_bytes, 100);//
> //+ }//
> //+ if ((pfd[FD_DSOCK].revents
> == POLLIN) ||//
> //+ (pfd[FD_DSOCK].revents & POLLHUP) ||//
> //+ (pfd[FD_TMRFD].revents == POLLIN)) {//
> //+ m_MDS_LOG_INFO("MDTM: break (pfd[FD_BSRSOCK].revents & POLLIN)
> Loop.."); //
> //+ break;
> ///+ }///
> ===============================================================================================/
>
>
>
> 2) replace this with below :
> ===============================================================================================
>
>
> On 11/22/2016 3:07 PM, Hans Nordeback wrote:
> - assert(0);
> + }
> + if (++recv_ctr > MAX_RECV_THRESHOLD) {
> + break;
> }
> ===============================================================================================
>
>
>
> ===============================================================================================
>
>
> - assert(0);
> + }
> + if (pfd[FD_DSOCK].revents &
> POLLHUP) {
> + m_MDS_LOG_INFO("MDTM: break (pfd[FD_DSOCK].revents == POLLIN) Loop..");
> + break;
> }
> ===============================================================================================
>
>
>
> 3) Remove variables `+#define MAX_RECV_THRESHOLD 1024` & `+
> unsigned int recv_ctr = 0`
>
> -AVM
>
>
> On 11/23/2016 11:07 AM, A V Mahesh wrote:
>> Hi HansN
>>
>> In my testing with concurrent application which sends large data and
>> large number of up/down event ,
>> the cluster is going for reboot because of some MDS application are
>> not getting scope to process with in expected time ( see below logs)
>> as I expected,
>> for example `osafamfnd[18568]: ER AMF director heart beat timeout,
>> generating core for amfd` &
>> `osaffmd[20377]: Rebooting OpenSAF NodeId = 0 EE Name = No EE Mapped,
>> Reason: Activation timer supervision expired: no ACTIVE assignment
>> received within the time limit,`,
>> unfortunately , they are no MDS logs are not being logged at the
>> time of event occurring ( it is being stuck some time back)
>> so ,please hold on this patch I will find way to root cause the
>> problem by doing different kind of testing.
>>
>> ==============================================================================================
>>
>>
>>
>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4807 COMMITTED
>> (immcfg_PL-4_32406)
>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4808 COMMITTED
>> (immcfg_PL-4_32409)
>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4809 COMMITTED
>> (immcfg_PL-4_32412)
>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4810 COMMITTED
>> (immcfg_PL-4_32415)
>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4811 COMMITTED
>> (immcfg_PL-4_32418)
>> Nov 23 10:46:27 SC-2 osafamfnd[18568]: ER AMF director heart beat
>> timeout, generating core for amfd
>> Nov 23 10:46:28 SC-2 osafamfnd[18568]: Rebooting OpenSAF NodeId =
>> 131599 EE Name = , Reason: AMF director heart beat timeout, OwnNodeId
>> = 131599, SupervisionTime = 60
>> Nov 23 10:46:28 SC-2 opensaf_reboot: Rebooting local node; timeout=60
>> Nov 23 10:46:29 SC-2 osafimmnd[18324]: WA MDS Send Failed
>> Nov 23 10:46:29 SC-2 osafimmnd[18324]: WA Error code 2 returned for
>> message type 16 - ignoring
>> Nov 23 10:47:09 SC-2 syslog-ng[1256]: syslog-ng starting up;
>> version='2.0.9'
>> Nov 23 10:47:10 SC-2 ifup: lo
>>
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 241 COMMITTED
>> (immcfg_PL-4_18950)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 242 COMMITTED
>> (immcfg_PL-4_18953)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 243 COMMITTED
>> (immcfg_PL-4_18956)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 244 COMMITTED
>> (immcfg_PL-4_18959)
>> Nov 23 10:46:22 SC-1 osaffmd[20377]: Rebooting OpenSAF NodeId = 0 EE
>> Name = No EE Mapped, Reason: Activation timer supervision expired: no
>> ACTIVE assignment received within the time limit, OwnNodeId = 131343,
>> SupervisionTime = 60
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 245 COMMITTED
>> (immcfg_PL-4_18962)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 246 COMMITTED
>> (immcfg_PL-4_18965)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 247 COMMITTED
>> (immcfg_PL-4_18968)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 255 COMMITTED
>> (immcfg_PL-4_18989)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 256 COMMITTED
>> (immcfg_PL-4_18992)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 257 COMMITTED
>> (immcfg_PL-4_18995)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 258 COMMITTED
>> (immcfg_PL-4_18998)
>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 251 COMMITTED
>> (immcfg_SC-1_20807)
>> Nov 23 10:46:22 SC-1 opensaf_reboot: Rebooting local node; timeout=60
>>
>> ==============================================================================================
>>
>>
>>
>> -AVM
>>
>>
>> On 11/22/2016 3:07 PM, Hans Nordeback wrote:
>>> osaf/libs/core/mds/mds_dt_tipc.c | 389
>>> +++++++++++++++++++++-----------------
>>> 1 files changed, 216 insertions(+), 173 deletions(-)
>>>
>>>
>>> This first part changes the socket descriptors to non blocking at
>>> receive.
>>> This to read all incoming messages before returning to the next poll
>>> call.
>>>
>>> diff --git a/osaf/libs/core/mds/mds_dt_tipc.c
>>> b/osaf/libs/core/mds/mds_dt_tipc.c
>>> --- a/osaf/libs/core/mds/mds_dt_tipc.c
>>> +++ b/osaf/libs/core/mds/mds_dt_tipc.c
>>> @@ -142,6 +142,8 @@ static MDS_SUBTN_REF_VAL handle;
>>> static uint16_t num_subscriptions;
>>> uint32_t mdtm_global_frag_num;
>>> +
>>> +#define MAX_RECV_THRESHOLD 1024
>>> /*********************************************************
>>> Function NAME: mdtm_tipc_init
>>> @@ -541,15 +543,13 @@ static uint32_t mdtm_destroy_rcv_task(vo
>>> ssize_t recvfrom_connectionless (int sd, void *buf, size_t nbytes,
>>> int flags,
>>> struct sockaddr *from, socklen_t *addrlen)
>>> {
>>> - struct msghdr msg;
>>> - struct iovec iov;
>>> + struct msghdr msg = {0};
>>> + struct iovec iov = {0};
>>> char anc_buf[CMSG_SPACE(8) + CMSG_SPACE(1024) + CMSG_SPACE(12)];
>>> struct cmsghdr *anc;
>>> int has_addr;
>>> int anc_data[2];
>>> - ssize_t sz;
>>> -
>>> has_addr = (from != NULL) && (addrlen != NULL);
>>> iov.iov_base = buf;
>>> @@ -562,55 +562,63 @@ ssize_t recvfrom_connectionless (int sd,
>>> msg.msg_control = anc_buf;
>>> msg.msg_controllen = sizeof(anc_buf);
>>> - sz = recvmsg(sd, &msg, flags);
>>> - if (sz >= 0) {
>>> - anc = CMSG_FIRSTHDR(&msg);
>>> - if (anc == NULL) {
>>> - m_MDS_LOG_DBG("MDTM: size: %d anc is NULL", (int)sz);
>>> + while (true) {
>>> + ssize_t sz = recvmsg(sd, &msg, flags);
>>> + if (sz == -1) {
>>> + if (errno == EAGAIN || errno == EWOULDBLOCK) {
>>> + return sz;
>>> + } else if (errno == EINTR) {
>>> + continue;
>>> + } else {
>>> + /* -1 indicates connection termination
>>> connectionless this not possible */
>>> + osaf_abort(sz);
>>> + }
>>> }
>>> - while (anc != NULL) {
>>> -
>>> - /* Receipt of a normal data message never creates the
>>> TIPC_ERRINFO
>>> - and TIPC_RETDATA objects, and only creates the
>>> TIPC_DESTNAME object
>>> - if the message was sent using a TIPC name or name
>>> sequence as the
>>> - destination rather than a TIPC port ID*/
>>> - if (anc->cmsg_type == TIPC_ERRINFO) {
>>> - anc_data[0] = *((unsigned int*)(CMSG_DATA(anc) + 0));
>>> - if (anc_data[0] == TIPC_ERR_OVERLOAD) {
>>> - LOG_CR("MDTM: undelivered message condition
>>> ancillary data: TIPC_ERR_OVERLOAD");
>>> - m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>> condition ancillary data: TIPC_ERR_OVERLOAD");
>>> + if (sz >= 0) {
>>> + anc = CMSG_FIRSTHDR(&msg);
>>> + if (anc == NULL) {
>>> + m_MDS_LOG_DBG("MDTM: size: %d anc is NULL", (int)sz);
>>> + }
>>> + while (anc != NULL) {
>>> +
>>> + /* Receipt of a normal data message never creates
>>> the TIPC_ERRINFO
>>> + and TIPC_RETDATA objects, and only creates the
>>> TIPC_DESTNAME object
>>> + if the message was sent using a TIPC name or
>>> name sequence as the
>>> + destination rather than a TIPC port ID*/
>>> + if (anc->cmsg_type == TIPC_ERRINFO) {
>>> + anc_data[0] = *((unsigned int*)(CMSG_DATA(anc)
>>> + 0));
>>> + if (anc_data[0] == TIPC_ERR_OVERLOAD) {
>>> + LOG_CR("MDTM: undelivered message condition
>>> ancillary data: TIPC_ERR_OVERLOAD");
>>> + m_MDS_LOG_CRITICAL("MDTM: undelivered
>>> message condition ancillary data: TIPC_ERR_OVERLOAD");
>>> + } else {
>>> + /* TIPC_ERRINFO - TIPC error code
>>> associated with a returned data message or a connection termination
>>> message */
>>> + m_MDS_LOG_DBG("MDTM: undelivered message
>>> condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]);
>>> + }
>>> + } else if (anc->cmsg_type == TIPC_RETDATA) {
>>> + /* If we set TIPC_DEST_DROPPABLE off message
>>> (configure TIPC to return rejected messages to the sender )
>>> + we will hit this when we implement MDS
>>> retransmit lost messages, can be replaced with flow control logic */
>>> + /* TIPC_RETDATA -The contents of a returned
>>> data message */
>>> + LOG_CR("MDTM: undelivered message condition
>>> ancillary data: TIPC_RETDATA");
>>> + m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>> condition ancillary data: TIPC_RETDATA");
>>> + } else if (anc->cmsg_type == TIPC_DESTNAME) {
>>> + if (sz == 0) {
>>> + m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>> received on sock, abnormal/unknown condition. Ignoring");
>>> + }
>>> } else {
>>> - /* TIPC_ERRINFO - TIPC error code associated
>>> with a returned data message or a connection termination message */
>>> - m_MDS_LOG_DBG("MDTM: undelivered message
>>> condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]);
>>> + m_MDS_LOG_INFO("MDTM: unrecognized ancillary
>>> data type %u\n", anc->cmsg_type);
>>> + if (sz == 0) {
>>> + m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>> received on sock, abnormal/unkown condition. Ignoring");
>>> + }
>>> }
>>> - } else if (anc->cmsg_type == TIPC_RETDATA) {
>>> - /* If we set TIPC_DEST_DROPPABLE off message
>>> (configure TIPC to return rejected messages to the sender )
>>> - we will hit this when we implement MDS
>>> retransmit lost messages, can be replaced with flow control logic */
>>> - /* TIPC_RETDATA -The contents of a returned data
>>> message */
>>> - LOG_CR("MDTM: undelivered message condition
>>> ancillary data: TIPC_RETDATA");
>>> - m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>> condition ancillary data: TIPC_RETDATA");
>>> - } else if (anc->cmsg_type == TIPC_DESTNAME) {
>>> - if (sz == 0) {
>>> - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>> on sock, abnormal/unknown condition. Ignoring");
>>> - }
>>> - } else {
>>> - m_MDS_LOG_INFO("MDTM: unrecognized ancillary data
>>> type %u\n", anc->cmsg_type);
>>> - if (sz == 0) {
>>> - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>> on sock, abnormal/unkown condition. Ignoring");
>>> - }
>>> +
>>> + anc = CMSG_NXTHDR(&msg, anc);
>>> }
>>> - anc = CMSG_NXTHDR(&msg, anc);
>>> + if (has_addr)
>>> + *addrlen = msg.msg_namelen;
>>> }
>>> -
>>> - if (has_addr)
>>> - *addrlen = msg.msg_namelen;
>>> - } else {
>>> - /* -1 indicates connection termination connectionless this
>>> not possible */
>>> - abort();
>>> + return sz;
>>> }
>>> -
>>> - return sz;
>>> }
>>> /*********************************************************
>>> @@ -627,91 +635,117 @@ ssize_t recvfrom_connectionless (int sd,
>>> *********************************************************/
>>> static uint32_t mdtm_process_recv_events(void)
>>> {
>>> + enum {
>>> + FD_DSOCK = 0,
>>> + FD_BSRSOCK,
>>> + FD_TMRFD,
>>> + NUM_FDS
>>> + };
>>> +
>>> /*
>>> STEP 1: Poll on the BSRsock and Dsock to get the events
>>> if data is received process the received data
>>> if discovery events are received , process the discovery
>>> events
>>> */
>>> - while (1) {
>>> + while (true) {
>>> + unsigned int recv_ctr = 0;
>>> unsigned int pollres;
>>> - struct pollfd pfd[3];
>>> + struct pollfd pfd[NUM_FDS] = {{0}};
>>> struct tipc_event event;
>>> - pfd[0].fd = tipc_cb.Dsock;
>>> - pfd[0].events = POLLIN;
>>> - pfd[1].fd = tipc_cb.BSRsock;
>>> - pfd[1].events = POLLIN;
>>> - pfd[2].fd = tipc_cb.tmr_fd;
>>> - pfd[2].events = POLLIN;
>>> -
>>> - pfd[0].revents = pfd[1].revents = pfd[2].revents = 0;
>>> -
>>> - pollres = poll(pfd, 3, MDTM_TIPC_POLL_TIMEOUT);
>>> + pfd[FD_DSOCK].fd = tipc_cb.Dsock;
>>> + pfd[FD_DSOCK].events = POLLIN;
>>> + pfd[FD_BSRSOCK].fd = tipc_cb.BSRsock;
>>> + pfd[FD_BSRSOCK].events = POLLIN;
>>> + pfd[FD_TMRFD].fd = tipc_cb.tmr_fd;
>>> + pfd[FD_TMRFD].events = POLLIN;
>>> +
>>> + pfd[FD_DSOCK].revents = pfd[FD_BSRSOCK].revents =
>>> pfd[FD_TMRFD].revents = 0;
>>> +
>>> + pollres = poll(pfd, NUM_FDS, MDTM_TIPC_POLL_TIMEOUT);
>>> if (pollres > 0) { /* Check for EINTR and discard */
>>> memset(&event, 0, sizeof(event));
>>> osaf_mutex_lock_ordie(&gl_mds_library_mutex);
>>> - if (pfd[0].revents == POLLIN) {
>>> - if (recv(tipc_cb.Dsock, &event, sizeof(event), 0)
>>> != sizeof(event)) {
>>> - m_MDS_LOG_ERR("Unable to capture the recd event
>>> .. Continuing err :%s", strerror(errno));
>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>> - continue;
>>> - } else {
>>> - if (NTOHL(event.event) == TIPC_PUBLISHED) {
>>> -
>>> - m_MDS_LOG_INFO("MDTM: Published: ");
>>> - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>> <0x%08x:%u>\n",
>>> - NTOHL(event.s.seq.type),
>>> NTOHL(event.found_lower),
>>> - NTOHL(event.found_upper),
>>> NTOHL(event.port.node),
>>> - NTOHL(event.port.ref));
>>> -
>>> - if ( NCSCC_RC_SUCCESS !=
>>> mdtm_process_discovery_events(TIPC_PUBLISHED,
>>> - event)) {
>>> - m_MDS_LOG_INFO("MDTM: Published Event
>>> processing status: F");
>>> + if (pfd[FD_DSOCK].revents == POLLIN) {
>>> + recv_ctr = 0;
>>> + while (true) {
>>> + ssize_t rc = recv(tipc_cb.Dsock, &event,
>>> sizeof(event), MSG_DONTWAIT);
>>> + if (rc == -1) {
>>> + if (errno == EAGAIN || errno == EWOULDBLOCK) {
>>> + break;
>>> + } else if (errno == EINTR) {
>>> + continue;
>>> + } else {
>>> + m_MDS_LOG_ERR("Unable to capture the
>>> recd event .. Continuing err :%s", strerror(errno));
>>> + break;
>>> }
>>> - } else if (NTOHL(event.event) == TIPC_WITHDRAWN) {
>>> - m_MDS_LOG_INFO("MDTM: Withdrawn: ");
>>> - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>> <0x%08x:%u>\n",
>>> - NTOHL(event.s.seq.type),
>>> NTOHL(event.found_lower),
>>> - NTOHL(event.found_upper),
>>> NTOHL(event.port.node),
>>> - NTOHL(event.port.ref));
>>> -
>>> - if ( NCSCC_RC_SUCCESS !=
>>> mdtm_process_discovery_events(TIPC_WITHDRAWN,
>>> - event)) {
>>> - m_MDS_LOG_INFO("MDTM: Withdrawn event
>>> processing status: F");
>>> + } else {
>>> + if (rc != sizeof(event)) {
>>> + m_MDS_LOG_ERR("Unable to capture the
>>> recd event .. Continuing err :%s", strerror(errno));
>>> + break;
>>> + } else {
>>> + if (NTOHL(event.event) ==
>>> TIPC_PUBLISHED) {
>>> +
>>> + m_MDS_LOG_INFO("MDTM: Published: ");
>>> + m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>> port id <0x%08x:%u>\n",
>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>> + NTOHL(event.port.ref));
>>> +
>>> + if (NCSCC_RC_SUCCESS !=
>>> mdtm_process_discovery_events(TIPC_PUBLISHED,
>>> + event)) {
>>> + m_MDS_LOG_INFO("MDTM: Published
>>> Event processing status: F");
>>> + }
>>> + } else if (NTOHL(event.event) ==
>>> TIPC_WITHDRAWN) {
>>> + m_MDS_LOG_INFO("MDTM: Withdrawn: ");
>>> + m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>> port id <0x%08x:%u>\n",
>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>> + NTOHL(event.port.ref));
>>> +
>>> + if (NCSCC_RC_SUCCESS !=
>>> mdtm_process_discovery_events(TIPC_WITHDRAWN,
>>> + event)) {
>>> + m_MDS_LOG_INFO("MDTM: Withdrawn
>>> event processing status: F");
>>> + }
>>> + } else if (NTOHL(event.event) ==
>>> TIPC_SUBSCR_TIMEOUT) {
>>> + /* As the timeout passed in
>>> infinite, No need to check for the Timeout */
>>> + m_MDS_LOG_ERR("MDTM: Timeou Event");
>>> + m_MDS_LOG_INFO("MDTM: Timeou Event");
>>> + m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>> port id <0x%08x:%u>\n",
>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>> + NTOHL(event.port.ref));
>>> + } else {
>>> + m_MDS_LOG_ERR("MDTM: Unknown Event");
>>> + /* This should never come */
>>> + osafassert(0);
>>> + }
>>> }
>>> - } else if (NTOHL(event.event) ==
>>> TIPC_SUBSCR_TIMEOUT) {
>>> - /* As the timeout passed in infinite, No
>>> need to check for the Timeout */
>>> - m_MDS_LOG_ERR("MDTM: Timeou Event");
>>> - m_MDS_LOG_INFO("MDTM: Timeou Event");
>>> - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>> <0x%08x:%u>\n",
>>> - NTOHL(event.s.seq.type),
>>> NTOHL(event.found_lower),
>>> - NTOHL(event.found_upper),
>>> NTOHL(event.port.node),
>>> - NTOHL(event.port.ref));
>>> - } else {
>>> - m_MDS_LOG_ERR("MDTM: Unknown Event");
>>> - /* This should never come */
>>> - assert(0);
>>> + }
>>> + if (++recv_ctr > MAX_RECV_THRESHOLD) {
>>> + break;
>>> }
>>> }
>>> - } else if (pfd[0].revents & POLLHUP) {
>>> + }
>>> + if (pfd[FD_DSOCK].revents & POLLHUP) {
>>> /* This value is returned when the number of
>>> subscriptions made cross the tipc max_subscr limit,
>>> so no more connection to the tipc topserver is
>>> present(viz no more up/down events),
>>> so abort and exit the process */
>>> - m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on
>>> Discovery Socket, No. of subscriptions=%d",
>>> - num_subscriptions);
>>> - abort(); /* This means, the process is use less as
>>> + m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on
>>> Discovery Socket, No. of subscriptions=%d",
>>> + num_subscriptions);
>>> + osaf_abort(pfd[FD_DSOCK].revents); /* This
>>> means, the process is use less as
>>> it has lost the connectivity with the
>>> topology server
>>> and will not be able to receive any
>>> UP/DOWN events */
>>> - } else if (pfd[1].revents & POLLIN) {
>>> + }
>>> + if (pfd[FD_BSRSOCK].revents & POLLIN) {
>>> /* Data Received */
>>> uint8_t *inbuf = tipc_cb.recvbuf;
>>> uint8_t *data; /* Used for decoding */
>>> - uint16_t recd_bytes = 0;
>>> #ifdef MDS_CHECKSUM_ENABLE_FLAG
>>> uint16_t old_checksum = 0;
>>> uint16_t new_checksum = 0;
>>> @@ -719,78 +753,87 @@ static uint32_t mdtm_process_recv_events
>>> struct sockaddr_tipc client_addr;
>>> socklen_t alen = sizeof(client_addr);
>>> - uint16_t recd_buf_len = 0;
>>> m_MDS_LOG_INFO("MDTM: Data received: Processing
>>> data ");
>>> - recd_bytes =
>>> recvfrom_connectionless(tipc_cb.BSRsock, inbuf,
>>> TIPC_MAX_USER_MSG_SIZE, 0,
>>> - (struct sockaddr *)&client_addr, &alen);
>>> - if (recd_bytes == 0) {
>>> - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>> on sock, abnormal/unknown/hack condition. Ignoring");
>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>> - continue;
>>> + recv_ctr = 0;
>>> + while (true) {
>>> + uint16_t recd_buf_len = 0;
>>> + ssize_t recd_bytes =
>>> recvfrom_connectionless(tipc_cb.BSRsock, inbuf,
>>> TIPC_MAX_USER_MSG_SIZE, MSG_DONTWAIT,
>>> + (struct sockaddr
>>> *)&client_addr, &alen);
>>> + if (recd_bytes == -1) {
>>> + m_MDS_LOG_DBG("MDTM: no more data to read");
>>> + break;
>>> + }
>>> + if (recd_bytes == 0) {
>>> + m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>> received on sock, abnormal/unknown/hack condition. Ignoring");
>>> + break;
>>> + }
>>> + data = inbuf;
>>> +
>>> + recd_buf_len = ncs_decode_16bit(&data);
>>> +
>>> + if (pfd[FD_BSRSOCK].revents & POLLERR) {
>>> + m_MDS_LOG_ERR("MDTM: Error
>>> Recd:tipc_id=<0x%08x:%u>:errno=0x%08x",
>>> + client_addr.addr.id.node,
>>> client_addr.addr.id.ref, errno);
>>> + } else if (recd_buf_len == recd_bytes) {
>>> + uint64_t tipc_id = 0;
>>> + uint32_t buff_dump = 0;
>>> + tipc_id =
>>> ((uint64_t)client_addr.addr.id.node) << 32; /* TIPC_ID=<NODE,REF> */
>>> + tipc_id |= client_addr.addr.id.ref;
>>> +
>>> +#ifdef MDS_CHECKSUM_ENABLE_FLAG
>>> + if (inbuf[2] == 1) {
>>> + old_checksum = ((uint16_t)inbuf[3] << 8
>>> | inbuf[4]);
>>> + inbuf[3] = 0;
>>> + inbuf[4] = 0;
>>> + new_checksum = mds_checksum(recd_bytes,
>>> inbuf);
>>> +
>>> + if (old_checksum != new_checksum) {
>>> + m_MDS_LOG_ERR
>>> + ("CHECKSUM-MISMATCH:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest =
>>> <%08x,%u>",
>>> + recd_bytes, tipc_id,
>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>> + id.node),
>>> + client_addr.addr.id.ref);
>>> + mds_buff_dump(inbuf, recd_bytes, 100);
>>> + osaf_abort();
>>> + break;
>>> + }
>>> + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>> &buff_dump);
>>> + if (buff_dump) {
>>> + m_MDS_LOG_ERR
>>> + ("RECV_DATA_PROCESS:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest =
>>> <%08x,%u>",
>>> + recd_bytes, tipc_id,
>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>> + id.node),
>>> + client_addr.addr.id.ref);
>>> + mds_buff_dump(inbuf, recd_bytes, 100);
>>> + }
>>> + } else {
>>> + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>> &buff_dump);
>>> + }
>>> +#else
>>> + mdtm_process_recv_data(&inbuf[2],
>>> recd_bytes - 2, tipc_id, &buff_dump);
>>> +#endif
>>> + } else {
>>> + uint64_t tipc_id;
>>> + tipc_id =
>>> ((uint64_t)client_addr.addr.id.node) << 32; /* TIPC_ID=<NODE,REF> */
>>> + tipc_id |= client_addr.addr.id.ref;
>>> +
>>> + /* Log message that we are dropping the
>>> data */
>>> + m_MDS_LOG_ERR
>>> + ("LEN-MISMATCH:recvd_on_sock=%zd,
>>> size_in_mds_hdr=%d, Tipc_id= %"PRIu64", Adest = <%08x,%u>",
>>> + recd_bytes, recd_buf_len, tipc_id,
>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node),
>>> + client_addr.addr.id.ref);
>>> + mds_buff_dump(inbuf, recd_bytes, 100);
>>> + }
>>> + if (++recv_ctr > MAX_RECV_THRESHOLD) {
>>> + break;
>>> + }
>>> }
>>> - data = inbuf;
>>> -
>>> - recd_buf_len = ncs_decode_16bit(&data);
>>> -
>>> - if (pfd[1].revents & POLLERR) {
>>> - m_MDS_LOG_ERR("MDTM: Error
>>> Recd:tipc_id=<0x%08x:%u>:errno=0x%08x",
>>> - client_addr.addr.id.node,
>>> client_addr.addr.id.ref, errno);
>>> - } else if (recd_buf_len == recd_bytes) {
>>> - uint64_t tipc_id = 0;
>>> - uint32_t buff_dump = 0;
>>> - tipc_id = ((uint64_t)client_addr.addr.id.node)
>>> << 32; /* TIPC_ID=<NODE,REF> */
>>> - tipc_id |= client_addr.addr.id.ref;
>>> -
>>> -#ifdef MDS_CHECKSUM_ENABLE_FLAG
>>> - if (inbuf[2] == 1) {
>>> - old_checksum = ((uint16_t)inbuf[3] << 8 |
>>> inbuf[4]);
>>> - inbuf[3] = 0;
>>> - inbuf[4] = 0;
>>> - new_checksum = mds_checksum(recd_bytes,
>>> inbuf);
>>> -
>>> - if (old_checksum != new_checksum) {
>>> - m_MDS_LOG_ERR
>>> - ("CHECKSUM-MISMATCH:recvd_on_sock=%d, Tipc_id=0x%llu, Adest =
>>> <%08x,%u>",
>>> - recd_bytes, tipc_id,
>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>> - id.node),
>>> - client_addr.addr.id.ref);
>>> - mds_buff_dump(inbuf, recd_bytes, 100);
>>> - abort();
>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>> - continue;
>>> - }
>>> - mdtm_process_recv_data(&inbuf[5],
>>> recd_bytes - 5, tipc_id, &buff_dump);
>>> - if (buff_dump) {
>>> - m_MDS_LOG_ERR
>>> - ("RECV_DATA_PROCESS:recvd_on_sock=%d, Tipc_id=0x%llu, Adest =
>>> <%08x,%u>",
>>> - recd_bytes, tipc_id,
>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>> - id.node),
>>> - client_addr.addr.id.ref);
>>> - mds_buff_dump(inbuf, recd_bytes, 100);
>>> - }
>>> - } else {
>>> - mdtm_process_recv_data(&inbuf[5],
>>> recd_bytes - 5, tipc_id, &buff_dump);
>>> - }
>>> -#else
>>> - mdtm_process_recv_data(&inbuf[2], recd_bytes -
>>> 2, tipc_id, &buff_dump);
>>> -#endif
>>> - } else {
>>> - uint64_t tipc_id;
>>> - tipc_id = ((uint64_t)client_addr.addr.id.node)
>>> << 32; /* TIPC_ID=<NODE,REF> */
>>> - tipc_id |= client_addr.addr.id.ref;
>>> -
>>> - /* Log message that we are dropping the data */
>>> - m_MDS_LOG_ERR
>>> - ("LEN-MISMATCH:recvd_on_sock=%d,
>>> size_in_mds_hdr=%d, Tipc_id= %"PRIu64", Adest = <%08x,%u>",
>>> - recd_bytes, recd_buf_len, tipc_id,
>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node),
>>> - client_addr.addr.id.ref);
>>> - mds_buff_dump(inbuf, recd_bytes, 100);
>>> - }
>>> - } else if (pfd[2].revents == POLLIN) {
>>> + }
>>> + if (pfd[FD_TMRFD].revents == POLLIN) {
>>> m_MDS_LOG_INFO("MDTM: Processing Timer mailbox
>>> events\n");
>>> /* Check if destroy-event has been processed */
>>> @@ -1396,7 +1439,7 @@ uint32_t mds_mdtm_svc_install_tipc(PW_EN
>>> if (setsockopt(tipc_cb.BSRsock, SOL_TIPC,
>>> TIPC_IMPORTANCE, &imp, sizeof(imp)) != 0) {
>>> m_MDS_LOG_ERR("MDTM: Can't set socket option TIPC_IMP
>>> err :%s\n", strerror(errno));
>>> - assert(0);
>>> + osafassert(0);
>>> } else {
>>> m_MDS_LOG_INFO("MDTM: Successfully set socket option
>>> TIPC_IMP, svc_id = %s(%d)",
>>> get_svc_names(svc_id), svc_id);
>>
>
------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today.http://sdm.link/xeonphi
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel