Hi Mahesh,
at a closer look there are some questions with the suggestion below,
the structure pfd will only be updated after a new poll call, i.e it is
the kernel who updates
this structure. And checking pfd in the FD_BSRSOCK loop will probably
give the old behaviour
of breaking the loop after one iteration. I think we have to consider
reverting back to use the counter
and switch to using the mailbox instead of the callbacks.
/Regards HansN
On 12/07/2016 01:32 PM, Hans Nordeback wrote:
> Hi Mahesh,
>
> great, the suggestion below looks good. I will send out a V4 shortly.
>
> /Thanks HansN
>
>
> On 12/07/2016 05:29 AM, A V Mahesh wrote:
>> Hi HansN,
>>
>> I did some investigation my fist observation is , issues occurring
>> because of delayed processing of Control /(FD_DSOCK/ = /POLLIN)/,
>> Timer (/FD_TMRFD = //POLLIN ) /and
>> Discovery Socket lost the connectivity with the topology events
>> (/[FD_DSOCK= //POLLHUP/ ), and this is occurring because of when held
>> up in processing of Data message loop of 1024 ,
>> so the fix for that will be breaking Data message processing loop if
>> we have priority control , timer and connectivity lost events.
>>
>> So we need to change the key logic of `message counter and break out
>> of the receive loop after we have received,
>> 1024 messages , even if there are still more messages on the socket`
>> with alternate logic. The new logic will also address the out of
>> order caused by MDS library as well.
>>
>> Please do following changes and re-based and send the new version
>> patch, so that i will continue testing of further and we can
>> conclude this patch.
>> If you are busy with some other ticket please let me know i will
>> drive the patch
>>
>> 1) replace this with below :
>> /===============================================================================================//
>>
>>
>> //On 11/22/2016 3:07 PM, Hans Nordeback wrote://
>> //> + mds_buff_dump(inbuf, recd_bytes, 100);//
>> //> + }//
>> //> + if (++recv_ctr > MAX_RECV_THRESHOLD) {//
>> //> + break;//
>> //> + }//
>> //===============================================================================================//
>>
>>
>> //
>> //===============================================================================================//
>>
>>
>> //+ mds_buff_dump(inbuf, recd_bytes, 100);//
>> //+ }//
>> //+ if ((pfd[FD_DSOCK].revents
>> == POLLIN) ||//
>> //+ (pfd[FD_DSOCK].revents & POLLHUP) ||//
>> //+ (pfd[FD_TMRFD].revents == POLLIN)) {//
>> //+ m_MDS_LOG_INFO("MDTM: break (pfd[FD_BSRSOCK].revents & POLLIN)
>> Loop.."); //
>> //+ break;
>> ///+ }///
>> ===============================================================================================/
>>
>>
>>
>> 2) replace this with below :
>> ===============================================================================================
>>
>>
>> On 11/22/2016 3:07 PM, Hans Nordeback wrote:
>> - assert(0);
>> + }
>> + if (++recv_ctr > MAX_RECV_THRESHOLD) {
>> + break;
>> }
>> ===============================================================================================
>>
>>
>>
>> ===============================================================================================
>>
>>
>> - assert(0);
>> + }
>> + if (pfd[FD_DSOCK].revents &
>> POLLHUP) {
>> + m_MDS_LOG_INFO("MDTM: break (pfd[FD_DSOCK].revents == POLLIN)
>> Loop..");
>> + break;
>> }
>> ===============================================================================================
>>
>>
>>
>> 3) Remove variables `+#define MAX_RECV_THRESHOLD 1024` & `+
>> unsigned int recv_ctr = 0`
>>
>> -AVM
>>
>>
>> On 11/23/2016 11:07 AM, A V Mahesh wrote:
>>> Hi HansN
>>>
>>> In my testing with concurrent application which sends large data and
>>> large number of up/down event ,
>>> the cluster is going for reboot because of some MDS application are
>>> not getting scope to process with in expected time ( see below
>>> logs) as I expected,
>>> for example `osafamfnd[18568]: ER AMF director heart beat timeout,
>>> generating core for amfd` &
>>> `osaffmd[20377]: Rebooting OpenSAF NodeId = 0 EE Name = No EE
>>> Mapped, Reason: Activation timer supervision expired: no ACTIVE
>>> assignment received within the time limit,`,
>>> unfortunately , they are no MDS logs are not being logged at the
>>> time of event occurring ( it is being stuck some time back)
>>> so ,please hold on this patch I will find way to root cause the
>>> problem by doing different kind of testing.
>>>
>>> ==============================================================================================
>>>
>>>
>>>
>>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4807 COMMITTED
>>> (immcfg_PL-4_32406)
>>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4808 COMMITTED
>>> (immcfg_PL-4_32409)
>>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4809 COMMITTED
>>> (immcfg_PL-4_32412)
>>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4810 COMMITTED
>>> (immcfg_PL-4_32415)
>>> Nov 23 10:46:27 SC-2 osafimmnd[18324]: NO Ccb 4811 COMMITTED
>>> (immcfg_PL-4_32418)
>>> Nov 23 10:46:27 SC-2 osafamfnd[18568]: ER AMF director heart beat
>>> timeout, generating core for amfd
>>> Nov 23 10:46:28 SC-2 osafamfnd[18568]: Rebooting OpenSAF NodeId =
>>> 131599 EE Name = , Reason: AMF director heart beat timeout,
>>> OwnNodeId = 131599, SupervisionTime = 60
>>> Nov 23 10:46:28 SC-2 opensaf_reboot: Rebooting local node; timeout=60
>>> Nov 23 10:46:29 SC-2 osafimmnd[18324]: WA MDS Send Failed
>>> Nov 23 10:46:29 SC-2 osafimmnd[18324]: WA Error code 2 returned for
>>> message type 16 - ignoring
>>> Nov 23 10:47:09 SC-2 syslog-ng[1256]: syslog-ng starting up;
>>> version='2.0.9'
>>> Nov 23 10:47:10 SC-2 ifup: lo
>>>
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 241 COMMITTED
>>> (immcfg_PL-4_18950)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 242 COMMITTED
>>> (immcfg_PL-4_18953)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 243 COMMITTED
>>> (immcfg_PL-4_18956)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 244 COMMITTED
>>> (immcfg_PL-4_18959)
>>> Nov 23 10:46:22 SC-1 osaffmd[20377]: Rebooting OpenSAF NodeId = 0 EE
>>> Name = No EE Mapped, Reason: Activation timer supervision expired:
>>> no ACTIVE assignment received within the time limit, OwnNodeId =
>>> 131343, SupervisionTime = 60
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 245 COMMITTED
>>> (immcfg_PL-4_18962)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 246 COMMITTED
>>> (immcfg_PL-4_18965)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 247 COMMITTED
>>> (immcfg_PL-4_18968)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 255 COMMITTED
>>> (immcfg_PL-4_18989)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 256 COMMITTED
>>> (immcfg_PL-4_18992)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 257 COMMITTED
>>> (immcfg_PL-4_18995)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 258 COMMITTED
>>> (immcfg_PL-4_18998)
>>> Nov 23 10:46:22 SC-1 osafimmnd[20398]: NO Ccb 251 COMMITTED
>>> (immcfg_SC-1_20807)
>>> Nov 23 10:46:22 SC-1 opensaf_reboot: Rebooting local node; timeout=60
>>>
>>> ==============================================================================================
>>>
>>>
>>>
>>> -AVM
>>>
>>>
>>> On 11/22/2016 3:07 PM, Hans Nordeback wrote:
>>>> osaf/libs/core/mds/mds_dt_tipc.c | 389
>>>> +++++++++++++++++++++-----------------
>>>> 1 files changed, 216 insertions(+), 173 deletions(-)
>>>>
>>>>
>>>> This first part changes the socket descriptors to non blocking at
>>>> receive.
>>>> This to read all incoming messages before returning to the next
>>>> poll call.
>>>>
>>>> diff --git a/osaf/libs/core/mds/mds_dt_tipc.c
>>>> b/osaf/libs/core/mds/mds_dt_tipc.c
>>>> --- a/osaf/libs/core/mds/mds_dt_tipc.c
>>>> +++ b/osaf/libs/core/mds/mds_dt_tipc.c
>>>> @@ -142,6 +142,8 @@ static MDS_SUBTN_REF_VAL handle;
>>>> static uint16_t num_subscriptions;
>>>> uint32_t mdtm_global_frag_num;
>>>> +
>>>> +#define MAX_RECV_THRESHOLD 1024
>>>> /*********************************************************
>>>> Function NAME: mdtm_tipc_init
>>>> @@ -541,15 +543,13 @@ static uint32_t mdtm_destroy_rcv_task(vo
>>>> ssize_t recvfrom_connectionless (int sd, void *buf, size_t
>>>> nbytes, int flags,
>>>> struct sockaddr *from, socklen_t *addrlen)
>>>> {
>>>> - struct msghdr msg;
>>>> - struct iovec iov;
>>>> + struct msghdr msg = {0};
>>>> + struct iovec iov = {0};
>>>> char anc_buf[CMSG_SPACE(8) + CMSG_SPACE(1024) + CMSG_SPACE(12)];
>>>> struct cmsghdr *anc;
>>>> int has_addr;
>>>> int anc_data[2];
>>>> - ssize_t sz;
>>>> -
>>>> has_addr = (from != NULL) && (addrlen != NULL);
>>>> iov.iov_base = buf;
>>>> @@ -562,55 +562,63 @@ ssize_t recvfrom_connectionless (int sd,
>>>> msg.msg_control = anc_buf;
>>>> msg.msg_controllen = sizeof(anc_buf);
>>>> - sz = recvmsg(sd, &msg, flags);
>>>> - if (sz >= 0) {
>>>> - anc = CMSG_FIRSTHDR(&msg);
>>>> - if (anc == NULL) {
>>>> - m_MDS_LOG_DBG("MDTM: size: %d anc is NULL", (int)sz);
>>>> + while (true) {
>>>> + ssize_t sz = recvmsg(sd, &msg, flags);
>>>> + if (sz == -1) {
>>>> + if (errno == EAGAIN || errno == EWOULDBLOCK) {
>>>> + return sz;
>>>> + } else if (errno == EINTR) {
>>>> + continue;
>>>> + } else {
>>>> + /* -1 indicates connection termination
>>>> connectionless this not possible */
>>>> + osaf_abort(sz);
>>>> + }
>>>> }
>>>> - while (anc != NULL) {
>>>> -
>>>> - /* Receipt of a normal data message never creates the
>>>> TIPC_ERRINFO
>>>> - and TIPC_RETDATA objects, and only creates the
>>>> TIPC_DESTNAME object
>>>> - if the message was sent using a TIPC name or name
>>>> sequence as the
>>>> - destination rather than a TIPC port ID*/
>>>> - if (anc->cmsg_type == TIPC_ERRINFO) {
>>>> - anc_data[0] = *((unsigned int*)(CMSG_DATA(anc) + 0));
>>>> - if (anc_data[0] == TIPC_ERR_OVERLOAD) {
>>>> - LOG_CR("MDTM: undelivered message condition
>>>> ancillary data: TIPC_ERR_OVERLOAD");
>>>> - m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>>> condition ancillary data: TIPC_ERR_OVERLOAD");
>>>> + if (sz >= 0) {
>>>> + anc = CMSG_FIRSTHDR(&msg);
>>>> + if (anc == NULL) {
>>>> + m_MDS_LOG_DBG("MDTM: size: %d anc is NULL",
>>>> (int)sz);
>>>> + }
>>>> + while (anc != NULL) {
>>>> +
>>>> + /* Receipt of a normal data message never creates
>>>> the TIPC_ERRINFO
>>>> + and TIPC_RETDATA objects, and only creates the
>>>> TIPC_DESTNAME object
>>>> + if the message was sent using a TIPC name or
>>>> name sequence as the
>>>> + destination rather than a TIPC port ID*/
>>>> + if (anc->cmsg_type == TIPC_ERRINFO) {
>>>> + anc_data[0] = *((unsigned int*)(CMSG_DATA(anc)
>>>> + 0));
>>>> + if (anc_data[0] == TIPC_ERR_OVERLOAD) {
>>>> + LOG_CR("MDTM: undelivered message
>>>> condition ancillary data: TIPC_ERR_OVERLOAD");
>>>> + m_MDS_LOG_CRITICAL("MDTM: undelivered
>>>> message condition ancillary data: TIPC_ERR_OVERLOAD");
>>>> + } else {
>>>> + /* TIPC_ERRINFO - TIPC error code
>>>> associated with a returned data message or a connection termination
>>>> message */
>>>> + m_MDS_LOG_DBG("MDTM: undelivered message
>>>> condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]);
>>>> + }
>>>> + } else if (anc->cmsg_type == TIPC_RETDATA) {
>>>> + /* If we set TIPC_DEST_DROPPABLE off message
>>>> (configure TIPC to return rejected messages to the sender )
>>>> + we will hit this when we implement MDS
>>>> retransmit lost messages, can be replaced with flow control logic */
>>>> + /* TIPC_RETDATA -The contents of a returned
>>>> data message */
>>>> + LOG_CR("MDTM: undelivered message condition
>>>> ancillary data: TIPC_RETDATA");
>>>> + m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>>> condition ancillary data: TIPC_RETDATA");
>>>> + } else if (anc->cmsg_type == TIPC_DESTNAME) {
>>>> + if (sz == 0) {
>>>> + m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>>> received on sock, abnormal/unknown condition. Ignoring");
>>>> + }
>>>> } else {
>>>> - /* TIPC_ERRINFO - TIPC error code associated
>>>> with a returned data message or a connection termination message */
>>>> - m_MDS_LOG_DBG("MDTM: undelivered message
>>>> condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]);
>>>> + m_MDS_LOG_INFO("MDTM: unrecognized ancillary
>>>> data type %u\n", anc->cmsg_type);
>>>> + if (sz == 0) {
>>>> + m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>>> received on sock, abnormal/unkown condition. Ignoring");
>>>> + }
>>>> }
>>>> - } else if (anc->cmsg_type == TIPC_RETDATA) {
>>>> - /* If we set TIPC_DEST_DROPPABLE off message
>>>> (configure TIPC to return rejected messages to the sender )
>>>> - we will hit this when we implement MDS
>>>> retransmit lost messages, can be replaced with flow control logic */
>>>> - /* TIPC_RETDATA -The contents of a returned data
>>>> message */
>>>> - LOG_CR("MDTM: undelivered message condition
>>>> ancillary data: TIPC_RETDATA");
>>>> - m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>>> condition ancillary data: TIPC_RETDATA");
>>>> - } else if (anc->cmsg_type == TIPC_DESTNAME) {
>>>> - if (sz == 0) {
>>>> - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>>> on sock, abnormal/unknown condition. Ignoring");
>>>> - }
>>>> - } else {
>>>> - m_MDS_LOG_INFO("MDTM: unrecognized ancillary data
>>>> type %u\n", anc->cmsg_type);
>>>> - if (sz == 0) {
>>>> - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>>> on sock, abnormal/unkown condition. Ignoring");
>>>> - }
>>>> +
>>>> + anc = CMSG_NXTHDR(&msg, anc);
>>>> }
>>>> - anc = CMSG_NXTHDR(&msg, anc);
>>>> + if (has_addr)
>>>> + *addrlen = msg.msg_namelen;
>>>> }
>>>> -
>>>> - if (has_addr)
>>>> - *addrlen = msg.msg_namelen;
>>>> - } else {
>>>> - /* -1 indicates connection termination connectionless this
>>>> not possible */
>>>> - abort();
>>>> + return sz;
>>>> }
>>>> -
>>>> - return sz;
>>>> }
>>>> /*********************************************************
>>>> @@ -627,91 +635,117 @@ ssize_t recvfrom_connectionless (int sd,
>>>> *********************************************************/
>>>> static uint32_t mdtm_process_recv_events(void)
>>>> {
>>>> + enum {
>>>> + FD_DSOCK = 0,
>>>> + FD_BSRSOCK,
>>>> + FD_TMRFD,
>>>> + NUM_FDS
>>>> + };
>>>> +
>>>> /*
>>>> STEP 1: Poll on the BSRsock and Dsock to get the events
>>>> if data is received process the received data
>>>> if discovery events are received , process the discovery
>>>> events
>>>> */
>>>> - while (1) {
>>>> + while (true) {
>>>> + unsigned int recv_ctr = 0;
>>>> unsigned int pollres;
>>>> - struct pollfd pfd[3];
>>>> + struct pollfd pfd[NUM_FDS] = {{0}};
>>>> struct tipc_event event;
>>>> - pfd[0].fd = tipc_cb.Dsock;
>>>> - pfd[0].events = POLLIN;
>>>> - pfd[1].fd = tipc_cb.BSRsock;
>>>> - pfd[1].events = POLLIN;
>>>> - pfd[2].fd = tipc_cb.tmr_fd;
>>>> - pfd[2].events = POLLIN;
>>>> -
>>>> - pfd[0].revents = pfd[1].revents = pfd[2].revents = 0;
>>>> -
>>>> - pollres = poll(pfd, 3, MDTM_TIPC_POLL_TIMEOUT);
>>>> + pfd[FD_DSOCK].fd = tipc_cb.Dsock;
>>>> + pfd[FD_DSOCK].events = POLLIN;
>>>> + pfd[FD_BSRSOCK].fd = tipc_cb.BSRsock;
>>>> + pfd[FD_BSRSOCK].events = POLLIN;
>>>> + pfd[FD_TMRFD].fd = tipc_cb.tmr_fd;
>>>> + pfd[FD_TMRFD].events = POLLIN;
>>>> +
>>>> + pfd[FD_DSOCK].revents = pfd[FD_BSRSOCK].revents =
>>>> pfd[FD_TMRFD].revents = 0;
>>>> +
>>>> + pollres = poll(pfd, NUM_FDS, MDTM_TIPC_POLL_TIMEOUT);
>>>> if (pollres > 0) { /* Check for EINTR and discard */
>>>> memset(&event, 0, sizeof(event));
>>>> osaf_mutex_lock_ordie(&gl_mds_library_mutex);
>>>> - if (pfd[0].revents == POLLIN) {
>>>> - if (recv(tipc_cb.Dsock, &event, sizeof(event), 0)
>>>> != sizeof(event)) {
>>>> - m_MDS_LOG_ERR("Unable to capture the recd
>>>> event .. Continuing err :%s", strerror(errno));
>>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>>> - continue;
>>>> - } else {
>>>> - if (NTOHL(event.event) == TIPC_PUBLISHED) {
>>>> -
>>>> - m_MDS_LOG_INFO("MDTM: Published: ");
>>>> - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>>> <0x%08x:%u>\n",
>>>> - NTOHL(event.s.seq.type),
>>>> NTOHL(event.found_lower),
>>>> - NTOHL(event.found_upper), NTOHL(event.port.node),
>>>> - NTOHL(event.port.ref));
>>>> -
>>>> - if ( NCSCC_RC_SUCCESS !=
>>>> mdtm_process_discovery_events(TIPC_PUBLISHED,
>>>> - event)) {
>>>> - m_MDS_LOG_INFO("MDTM: Published Event
>>>> processing status: F");
>>>> + if (pfd[FD_DSOCK].revents == POLLIN) {
>>>> + recv_ctr = 0;
>>>> + while (true) {
>>>> + ssize_t rc = recv(tipc_cb.Dsock, &event,
>>>> sizeof(event), MSG_DONTWAIT);
>>>> + if (rc == -1) {
>>>> + if (errno == EAGAIN || errno ==
>>>> EWOULDBLOCK) {
>>>> + break;
>>>> + } else if (errno == EINTR) {
>>>> + continue;
>>>> + } else {
>>>> + m_MDS_LOG_ERR("Unable to capture the
>>>> recd event .. Continuing err :%s", strerror(errno));
>>>> + break;
>>>> }
>>>> - } else if (NTOHL(event.event) ==
>>>> TIPC_WITHDRAWN) {
>>>> - m_MDS_LOG_INFO("MDTM: Withdrawn: ");
>>>> - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>>> <0x%08x:%u>\n",
>>>> - NTOHL(event.s.seq.type),
>>>> NTOHL(event.found_lower),
>>>> - NTOHL(event.found_upper), NTOHL(event.port.node),
>>>> - NTOHL(event.port.ref));
>>>> -
>>>> - if ( NCSCC_RC_SUCCESS !=
>>>> mdtm_process_discovery_events(TIPC_WITHDRAWN,
>>>> - event)) {
>>>> - m_MDS_LOG_INFO("MDTM: Withdrawn event
>>>> processing status: F");
>>>> + } else {
>>>> + if (rc != sizeof(event)) {
>>>> + m_MDS_LOG_ERR("Unable to capture the
>>>> recd event .. Continuing err :%s", strerror(errno));
>>>> + break;
>>>> + } else {
>>>> + if (NTOHL(event.event) ==
>>>> TIPC_PUBLISHED) {
>>>> +
>>>> + m_MDS_LOG_INFO("MDTM: Published: ");
>>>> + m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>>> port id <0x%08x:%u>\n",
>>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>>> + NTOHL(event.port.ref));
>>>> +
>>>> + if (NCSCC_RC_SUCCESS !=
>>>> mdtm_process_discovery_events(TIPC_PUBLISHED,
>>>> + event)) {
>>>> + m_MDS_LOG_INFO("MDTM:
>>>> Published Event processing status: F");
>>>> + }
>>>> + } else if (NTOHL(event.event) ==
>>>> TIPC_WITHDRAWN) {
>>>> + m_MDS_LOG_INFO("MDTM: Withdrawn: ");
>>>> + m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>>> port id <0x%08x:%u>\n",
>>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>>> + NTOHL(event.port.ref));
>>>> +
>>>> + if (NCSCC_RC_SUCCESS !=
>>>> mdtm_process_discovery_events(TIPC_WITHDRAWN,
>>>> + event)) {
>>>> + m_MDS_LOG_INFO("MDTM:
>>>> Withdrawn event processing status: F");
>>>> + }
>>>> + } else if (NTOHL(event.event) ==
>>>> TIPC_SUBSCR_TIMEOUT) {
>>>> + /* As the timeout passed in
>>>> infinite, No need to check for the Timeout */
>>>> + m_MDS_LOG_ERR("MDTM: Timeou Event");
>>>> + m_MDS_LOG_INFO("MDTM: Timeou Event");
>>>> + m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>>> port id <0x%08x:%u>\n",
>>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>>> + NTOHL(event.port.ref));
>>>> + } else {
>>>> + m_MDS_LOG_ERR("MDTM: Unknown Event");
>>>> + /* This should never come */
>>>> + osafassert(0);
>>>> + }
>>>> }
>>>> - } else if (NTOHL(event.event) ==
>>>> TIPC_SUBSCR_TIMEOUT) {
>>>> - /* As the timeout passed in infinite, No
>>>> need to check for the Timeout */
>>>> - m_MDS_LOG_ERR("MDTM: Timeou Event");
>>>> - m_MDS_LOG_INFO("MDTM: Timeou Event");
>>>> - m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>>> <0x%08x:%u>\n",
>>>> - NTOHL(event.s.seq.type),
>>>> NTOHL(event.found_lower),
>>>> - NTOHL(event.found_upper), NTOHL(event.port.node),
>>>> - NTOHL(event.port.ref));
>>>> - } else {
>>>> - m_MDS_LOG_ERR("MDTM: Unknown Event");
>>>> - /* This should never come */
>>>> - assert(0);
>>>> + }
>>>> + if (++recv_ctr > MAX_RECV_THRESHOLD) {
>>>> + break;
>>>> }
>>>> }
>>>> - } else if (pfd[0].revents & POLLHUP) {
>>>> + }
>>>> + if (pfd[FD_DSOCK].revents & POLLHUP) {
>>>> /* This value is returned when the number of
>>>> subscriptions made cross the tipc max_subscr limit,
>>>> so no more connection to the tipc topserver is
>>>> present(viz no more up/down events),
>>>> so abort and exit the process */
>>>> - m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on
>>>> Discovery Socket, No. of subscriptions=%d",
>>>> - num_subscriptions);
>>>> - abort(); /* This means, the process is use less as
>>>> + m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on
>>>> Discovery Socket, No. of subscriptions=%d",
>>>> + num_subscriptions);
>>>> + osaf_abort(pfd[FD_DSOCK].revents); /* This
>>>> means, the process is use less as
>>>> it has lost the connectivity with the
>>>> topology server
>>>> and will not be able to receive any
>>>> UP/DOWN events */
>>>> - } else if (pfd[1].revents & POLLIN) {
>>>> + }
>>>> + if (pfd[FD_BSRSOCK].revents & POLLIN) {
>>>> /* Data Received */
>>>> uint8_t *inbuf = tipc_cb.recvbuf;
>>>> uint8_t *data; /* Used for decoding */
>>>> - uint16_t recd_bytes = 0;
>>>> #ifdef MDS_CHECKSUM_ENABLE_FLAG
>>>> uint16_t old_checksum = 0;
>>>> uint16_t new_checksum = 0;
>>>> @@ -719,78 +753,87 @@ static uint32_t mdtm_process_recv_events
>>>> struct sockaddr_tipc client_addr;
>>>> socklen_t alen = sizeof(client_addr);
>>>> - uint16_t recd_buf_len = 0;
>>>> m_MDS_LOG_INFO("MDTM: Data received: Processing
>>>> data ");
>>>> - recd_bytes =
>>>> recvfrom_connectionless(tipc_cb.BSRsock, inbuf,
>>>> TIPC_MAX_USER_MSG_SIZE, 0,
>>>> - (struct sockaddr *)&client_addr, &alen);
>>>> - if (recd_bytes == 0) {
>>>> - m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>>> on sock, abnormal/unknown/hack condition. Ignoring");
>>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>>> - continue;
>>>> + recv_ctr = 0;
>>>> + while (true) {
>>>> + uint16_t recd_buf_len = 0;
>>>> + ssize_t recd_bytes =
>>>> recvfrom_connectionless(tipc_cb.BSRsock, inbuf,
>>>> TIPC_MAX_USER_MSG_SIZE, MSG_DONTWAIT,
>>>> + (struct sockaddr
>>>> *)&client_addr, &alen);
>>>> + if (recd_bytes == -1) {
>>>> + m_MDS_LOG_DBG("MDTM: no more data to read");
>>>> + break;
>>>> + }
>>>> + if (recd_bytes == 0) {
>>>> + m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>>> received on sock, abnormal/unknown/hack condition. Ignoring");
>>>> + break;
>>>> + }
>>>> + data = inbuf;
>>>> +
>>>> + recd_buf_len = ncs_decode_16bit(&data);
>>>> +
>>>> + if (pfd[FD_BSRSOCK].revents & POLLERR) {
>>>> + m_MDS_LOG_ERR("MDTM: Error
>>>> Recd:tipc_id=<0x%08x:%u>:errno=0x%08x",
>>>> + client_addr.addr.id.node,
>>>> client_addr.addr.id.ref, errno);
>>>> + } else if (recd_buf_len == recd_bytes) {
>>>> + uint64_t tipc_id = 0;
>>>> + uint32_t buff_dump = 0;
>>>> + tipc_id =
>>>> ((uint64_t)client_addr.addr.id.node) << 32; /*
>>>> TIPC_ID=<NODE,REF> */
>>>> + tipc_id |= client_addr.addr.id.ref;
>>>> +
>>>> +#ifdef MDS_CHECKSUM_ENABLE_FLAG
>>>> + if (inbuf[2] == 1) {
>>>> + old_checksum = ((uint16_t)inbuf[3] <<
>>>> 8 | inbuf[4]);
>>>> + inbuf[3] = 0;
>>>> + inbuf[4] = 0;
>>>> + new_checksum =
>>>> mds_checksum(recd_bytes, inbuf);
>>>> +
>>>> + if (old_checksum != new_checksum) {
>>>> + m_MDS_LOG_ERR
>>>> + ("CHECKSUM-MISMATCH:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest =
>>>> <%08x,%u>",
>>>> + recd_bytes, tipc_id,
>>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>> + id.node),
>>>> + client_addr.addr.id.ref);
>>>> + mds_buff_dump(inbuf, recd_bytes,
>>>> 100);
>>>> + osaf_abort();
>>>> + break;
>>>> + }
>>>> + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>> &buff_dump);
>>>> + if (buff_dump) {
>>>> + m_MDS_LOG_ERR
>>>> + ("RECV_DATA_PROCESS:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest =
>>>> <%08x,%u>",
>>>> + recd_bytes, tipc_id,
>>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>> + id.node),
>>>> + client_addr.addr.id.ref);
>>>> + mds_buff_dump(inbuf, recd_bytes,
>>>> 100);
>>>> + }
>>>> + } else {
>>>> + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>> &buff_dump);
>>>> + }
>>>> +#else
>>>> + mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id,
>>>> &buff_dump);
>>>> +#endif
>>>> + } else {
>>>> + uint64_t tipc_id;
>>>> + tipc_id =
>>>> ((uint64_t)client_addr.addr.id.node) << 32; /*
>>>> TIPC_ID=<NODE,REF> */
>>>> + tipc_id |= client_addr.addr.id.ref;
>>>> +
>>>> + /* Log message that we are dropping the
>>>> data */
>>>> + m_MDS_LOG_ERR
>>>> + ("LEN-MISMATCH:recvd_on_sock=%zd, size_in_mds_hdr=%d, Tipc_id=
>>>> %"PRIu64", Adest = <%08x,%u>",
>>>> + recd_bytes, recd_buf_len, tipc_id,
>>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node),
>>>> + client_addr.addr.id.ref);
>>>> + mds_buff_dump(inbuf, recd_bytes, 100);
>>>> + }
>>>> + if (++recv_ctr > MAX_RECV_THRESHOLD) {
>>>> + break;
>>>> + }
>>>> }
>>>> - data = inbuf;
>>>> -
>>>> - recd_buf_len = ncs_decode_16bit(&data);
>>>> -
>>>> - if (pfd[1].revents & POLLERR) {
>>>> - m_MDS_LOG_ERR("MDTM: Error
>>>> Recd:tipc_id=<0x%08x:%u>:errno=0x%08x",
>>>> - client_addr.addr.id.node,
>>>> client_addr.addr.id.ref, errno);
>>>> - } else if (recd_buf_len == recd_bytes) {
>>>> - uint64_t tipc_id = 0;
>>>> - uint32_t buff_dump = 0;
>>>> - tipc_id = ((uint64_t)client_addr.addr.id.node)
>>>> << 32; /* TIPC_ID=<NODE,REF> */
>>>> - tipc_id |= client_addr.addr.id.ref;
>>>> -
>>>> -#ifdef MDS_CHECKSUM_ENABLE_FLAG
>>>> - if (inbuf[2] == 1) {
>>>> - old_checksum = ((uint16_t)inbuf[3] << 8 |
>>>> inbuf[4]);
>>>> - inbuf[3] = 0;
>>>> - inbuf[4] = 0;
>>>> - new_checksum = mds_checksum(recd_bytes,
>>>> inbuf);
>>>> -
>>>> - if (old_checksum != new_checksum) {
>>>> - m_MDS_LOG_ERR
>>>> - ("CHECKSUM-MISMATCH:recvd_on_sock=%d, Tipc_id=0x%llu, Adest =
>>>> <%08x,%u>",
>>>> - recd_bytes, tipc_id,
>>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>> - id.node),
>>>> - client_addr.addr.id.ref);
>>>> - mds_buff_dump(inbuf, recd_bytes, 100);
>>>> - abort();
>>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>>> - continue;
>>>> - }
>>>> - mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>> &buff_dump);
>>>> - if (buff_dump) {
>>>> - m_MDS_LOG_ERR
>>>> - ("RECV_DATA_PROCESS:recvd_on_sock=%d, Tipc_id=0x%llu, Adest =
>>>> <%08x,%u>",
>>>> - recd_bytes, tipc_id,
>>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>> - id.node),
>>>> - client_addr.addr.id.ref);
>>>> - mds_buff_dump(inbuf, recd_bytes, 100);
>>>> - }
>>>> - } else {
>>>> - mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>> &buff_dump);
>>>> - }
>>>> -#else
>>>> - mdtm_process_recv_data(&inbuf[2], recd_bytes -
>>>> 2, tipc_id, &buff_dump);
>>>> -#endif
>>>> - } else {
>>>> - uint64_t tipc_id;
>>>> - tipc_id = ((uint64_t)client_addr.addr.id.node)
>>>> << 32; /* TIPC_ID=<NODE,REF> */
>>>> - tipc_id |= client_addr.addr.id.ref;
>>>> -
>>>> - /* Log message that we are dropping the data */
>>>> - m_MDS_LOG_ERR
>>>> - ("LEN-MISMATCH:recvd_on_sock=%d,
>>>> size_in_mds_hdr=%d, Tipc_id= %"PRIu64", Adest = <%08x,%u>",
>>>> - recd_bytes, recd_buf_len, tipc_id,
>>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node),
>>>> - client_addr.addr.id.ref);
>>>> - mds_buff_dump(inbuf, recd_bytes, 100);
>>>> - }
>>>> - } else if (pfd[2].revents == POLLIN) {
>>>> + }
>>>> + if (pfd[FD_TMRFD].revents == POLLIN) {
>>>> m_MDS_LOG_INFO("MDTM: Processing Timer mailbox
>>>> events\n");
>>>> /* Check if destroy-event has been processed */
>>>> @@ -1396,7 +1439,7 @@ uint32_t mds_mdtm_svc_install_tipc(PW_EN
>>>> if (setsockopt(tipc_cb.BSRsock, SOL_TIPC,
>>>> TIPC_IMPORTANCE, &imp, sizeof(imp)) != 0) {
>>>> m_MDS_LOG_ERR("MDTM: Can't set socket option TIPC_IMP
>>>> err :%s\n", strerror(errno));
>>>> - assert(0);
>>>> + osafassert(0);
>>>> } else {
>>>> m_MDS_LOG_INFO("MDTM: Successfully set socket option
>>>> TIPC_IMP, svc_id = %s(%d)",
>>>> get_svc_names(svc_id), svc_id);
>>>
>>
>
------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today.http://sdm.link/xeonphi
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel