Hi HansN

  Ok , I will review.

  In my initial testing with concurrent application which sends large 
data and large number of up/down event
  I am seeing some problem ( not sure related to this patch or not ),  
so  I will  do deep  testing  again know you the result .

-AVM

On 11/22/2016 5:32 PM, Hans Nordebäck wrote:
> Hi Mahesh,
>
> V3 of patch is sent out. Regarding the out of order problem mentioned below 
> it is addressed by ticket #2124 and #2180.
>
> /Thanks HansN
>
> -----Original Message-----
> From: A V Mahesh [mailto:mahesh.va...@oracle.com]
> Sent: den 22 november 2016 10:06
> To: Hans Nordebäck <hans.nordeb...@ericsson.com>
> Cc: opensaf-devel@lists.sourceforge.net; Anders Widell 
> <anders.wid...@ericsson.com>
> Subject: Re: [devel] [PATCH 1 of 1] mds: Remove use of mutex at TIPC receive 
> (part 1) V2 [#2132]
>
> Hi HansN,
>
> On 11/22/2016 1:52 PM, Hans Nordeback wrote:
>> Hi Mahesh,
>>
>> yes I will send out a V3 with AndersW comments, but I was waiting for
>> your comments.
> Ok please send V3.
>
> My concern was out of order major , and we can not keep that as known issue 
> in MDS Transport , why because  cluster will misbehave .
>
> -AVM
>
>> I'll send out a V3 shortly.
>>
>> /Regards Hans
>>
>>
>> On 11/22/2016 09:17 AM, A V Mahesh wrote:
>>> Hi HansN
>>>
>>> On 11/9/2016 8:50 PM, Anders Widell wrote:
>>>> When looking more closely at this patch I notice that there is a
>>>> risk for starvation here. We ought to have a message counter and
>>>> break out of the receive loop after we have received, say, 1024
>>>> messages - even if there are still more messages on the socket. This
>>>> applies to both the topology socket and the data socket.
>>> I though you will change the patch with `break out of the receive
>>> loop after we have received, say, 1024 messages`  logic .
>>>
>>> -AVM
>>>
>>> On 11/22/2016 1:19 PM, Hans Nordebäck wrote:
>>>> Hi Mahesh,
>>>>
>>>> I didn't find any review comments on V2, only from AndersW. I did
>>>> find some questions on MSG_DONTWAIT flag only. /Regards HansN
>>>>
>>>> -----Original Message-----
>>>> From: A V Mahesh [mailto:mahesh.va...@oracle.com]
>>>> Sent: den 22 november 2016 04:10
>>>> To: Hans Nordebäck <hans.nordeb...@ericsson.com>
>>>> Cc: opensaf-devel@lists.sourceforge.net; Anders Widell
>>>> <anders.wid...@ericsson.com>
>>>> Subject: Re: [devel] [PATCH 1 of 1] mds: Remove use of mutex at TIPC
>>>> receive (part 1) V2 [#2132]
>>>>
>>>> Hi HansN,
>>>>
>>>> Is this V3 patch after Anders & My review comments  ?
>>>>
>>>> -AVM
>>>>
>>>>
>>>> On 11/21/2016 7:30 PM, Hans Nordebäck wrote:
>>>>> Hi Mahesh,
>>>>> Have you had time to review this patch?
>>>>> /Regards HansN
>>>>>
>>>>> -----Original Message-----
>>>>> From: Hans Nordeback [mailto:hans.nordeb...@ericsson.com]
>>>>> Sent: den 27 oktober 2016 17:29
>>>>> To: Anders Widell <anders.wid...@ericsson.com>;
>>>>> mahesh.va...@oracle.com
>>>>> Cc: opensaf-devel@lists.sourceforge.net
>>>>> Subject: [devel] [PATCH 1 of 1] mds: Remove use of mutex at TIPC
>>>>> receive (part 1) V2 [#2132]
>>>>>
>>>>>     osaf/libs/core/mds/mds_dt_tipc.c |  374
>>>>> +++++++++++++++++++++-----------------
>>>>>     1 files changed, 203 insertions(+), 171 deletions(-)
>>>>>
>>>>>
>>>>> This first part changes the socket descriptors to non blocking at
>>>>> receive.
>>>>> This to read all incoming messages before returning to the next
>>>>> poll call.
>>>>>
>>>>> diff --git a/osaf/libs/core/mds/mds_dt_tipc.c
>>>>> b/osaf/libs/core/mds/mds_dt_tipc.c
>>>>> --- a/osaf/libs/core/mds/mds_dt_tipc.c
>>>>> +++ b/osaf/libs/core/mds/mds_dt_tipc.c
>>>>> @@ -565,15 +565,13 @@ static uint32_t mdtm_destroy_rcv_task(vo
>>>>> ssize_t recvfrom_connectionless (int sd, void *buf, size_t nbytes,
>>>>> int flags,
>>>>>             struct sockaddr *from, socklen_t *addrlen)  {
>>>>> -    struct msghdr msg;
>>>>> -    struct iovec iov;
>>>>> +    struct msghdr msg = {0};
>>>>> +    struct iovec iov = {0};
>>>>>         char anc_buf[CMSG_SPACE(8) + CMSG_SPACE(1024) +
>>>>> CMSG_SPACE(12)];
>>>>>         struct cmsghdr *anc;
>>>>>         int has_addr;
>>>>>         int anc_data[2];
>>>>>     -    ssize_t sz;
>>>>> -
>>>>>         has_addr = (from != NULL) && (addrlen != NULL);
>>>>>            iov.iov_base = buf;
>>>>> @@ -586,55 +584,63 @@ ssize_t recvfrom_connectionless (int sd,
>>>>>         msg.msg_control = anc_buf;
>>>>>         msg.msg_controllen = sizeof(anc_buf);
>>>>>     -    sz = recvmsg(sd, &msg, flags);
>>>>> -    if (sz >= 0) {
>>>>> -        anc = CMSG_FIRSTHDR(&msg);
>>>>> -        if (anc == NULL) {
>>>>> -            m_MDS_LOG_DBG("MDTM: size: %d  anc is NULL", (int)sz);
>>>>> +    while (true) {
>>>>> +        ssize_t sz = recvmsg(sd, &msg, flags);
>>>>> +        if (sz == -1) {
>>>>> +            if (errno == EAGAIN || errno == EWOULDBLOCK) {
>>>>> +                return sz;
>>>>> +            } else if (errno == EINTR) {
>>>>> +                continue;
>>>>> +            } else {
>>>>> +                /* -1 indicates connection termination
>>>>> connectionless this not possible */
>>>>> +                abort();
>>>>> +            }
>>>>>             }
>>>>> -        while (anc != NULL) {
>>>>> -
>>>>> -            /* Receipt of a normal data message never creates the
>>>>> TIPC_ERRINFO
>>>>> -               and TIPC_RETDATA objects, and only creates the
>>>>> TIPC_DESTNAME object
>>>>> -               if the message was sent using a TIPC name or name
>>>>> sequence as the
>>>>> -               destination rather than a TIPC port ID*/
>>>>> -            if (anc->cmsg_type == TIPC_ERRINFO) {
>>>>> -                anc_data[0] = *((unsigned int*)(CMSG_DATA(anc) + 0));
>>>>> -                if (anc_data[0] == TIPC_ERR_OVERLOAD) {
>>>>> -                    LOG_CR("MDTM: undelivered message condition
>>>>> ancillary data: TIPC_ERR_OVERLOAD");
>>>>> -                    m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>>>> condition ancillary data: TIPC_ERR_OVERLOAD");
>>>>> +        if (sz >= 0) {
>>>>> +            anc = CMSG_FIRSTHDR(&msg);
>>>>> +            if (anc == NULL) {
>>>>> +                m_MDS_LOG_DBG("MDTM: size: %d  anc is NULL",
>>>>> (int)sz);
>>>>> +            }
>>>>> +            while (anc != NULL) {
>>>>> +
>>>>> +                /* Receipt of a normal data message never creates
>>>>> the TIPC_ERRINFO
>>>>> +                   and TIPC_RETDATA objects, and only creates the
>>>>> TIPC_DESTNAME object
>>>>> +                   if the message was sent using a TIPC name or
>>>>> name sequence as the
>>>>> +                   destination rather than a TIPC port ID*/
>>>>> +                if (anc->cmsg_type == TIPC_ERRINFO) {
>>>>> +                    anc_data[0] = *((unsigned int*)(CMSG_DATA(anc)
>>>>> + 0));
>>>>> +                    if (anc_data[0] == TIPC_ERR_OVERLOAD) {
>>>>> +                        LOG_CR("MDTM: undelivered message
>>>>> condition ancillary data: TIPC_ERR_OVERLOAD");
>>>>> +                        m_MDS_LOG_CRITICAL("MDTM: undelivered
>>>>> message condition ancillary data: TIPC_ERR_OVERLOAD");
>>>>> +                    } else {
>>>>> +                        /* TIPC_ERRINFO - TIPC error code
>>>>> associated with a returned data message or a connection termination
>>>>> message */
>>>>> +                        m_MDS_LOG_DBG("MDTM: undelivered message
>>>>> condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]);
>>>>> +                    }
>>>>> +                } else if (anc->cmsg_type == TIPC_RETDATA) {
>>>>> +                    /* If we set TIPC_DEST_DROPPABLE off message
>>>>> (configure TIPC to return rejected messages to the sender )
>>>>> +                       we will hit this when we implement MDS
>>>>> retransmit lost messages, can be replaced with flow control logic
>>>>> */
>>>>> +                    /* TIPC_RETDATA -The contents of a returned
>>>>> data message */
>>>>> +                    LOG_CR("MDTM: undelivered message condition
>>>>> ancillary data: TIPC_RETDATA");
>>>>> +                    m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>>>> condition ancillary data: TIPC_RETDATA");
>>>>> +                } else if (anc->cmsg_type == TIPC_DESTNAME) {
>>>>> +                    if (sz == 0) {
>>>>> +                        m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>>>> received on sock, abnormal/unknown  condition. Ignoring");
>>>>> +                    }
>>>>>                     } else {
>>>>> -                    /* TIPC_ERRINFO - TIPC error code associated
>>>>> with a returned data message or a connection termination message */
>>>>> -                    m_MDS_LOG_DBG("MDTM: undelivered message
>>>>> condition ancillary data: TIPC_ERRINFO err : %d", anc_data[0]);
>>>>> +                    m_MDS_LOG_INFO("MDTM: unrecognized ancillary
>>>>> data type %u\n",    anc->cmsg_type);
>>>>> +                    if (sz == 0) {
>>>>> +                        m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>>>> received on sock, abnormal/unkown  condition. Ignoring");
>>>>> +                    }
>>>>>                     }
>>>>> -            } else if (anc->cmsg_type == TIPC_RETDATA) {
>>>>> -                /* If we set TIPC_DEST_DROPPABLE off message
>>>>> (configure TIPC to return rejected messages to the sender )
>>>>> -                   we will hit this when we implement MDS
>>>>> retransmit lost messages, can be replaced with flow control logic */
>>>>> -                /* TIPC_RETDATA -The contents of a returned data
>>>>> message */
>>>>> -                LOG_CR("MDTM: undelivered message condition
>>>>> ancillary data: TIPC_RETDATA");
>>>>> -                m_MDS_LOG_CRITICAL("MDTM: undelivered message
>>>>> condition ancillary data: TIPC_RETDATA");
>>>>> -            } else if (anc->cmsg_type == TIPC_DESTNAME) {
>>>>> -                if (sz == 0) {
>>>>> -                    m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>>>> on sock, abnormal/unknown  condition. Ignoring");
>>>>> -                }
>>>>> -            } else {
>>>>> -                m_MDS_LOG_INFO("MDTM: unrecognized ancillary data
>>>>> type %u\n",    anc->cmsg_type);
>>>>> -                if (sz == 0) {
>>>>> -                    m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>>>> on sock, abnormal/unkown  condition. Ignoring");
>>>>> -                }
>>>>> +
>>>>> +                anc = CMSG_NXTHDR(&msg, anc);
>>>>>                 }
>>>>>     -            anc = CMSG_NXTHDR(&msg, anc);
>>>>> +            if (has_addr)
>>>>> +                *addrlen = msg.msg_namelen;
>>>>>             }
>>>>> -
>>>>> -        if (has_addr)
>>>>> -            *addrlen = msg.msg_namelen;
>>>>> -    } else {
>>>>> -        /* -1 indicates connection termination connectionless this
>>>>> not possible */
>>>>> -        abort();
>>>>> +        return sz;
>>>>>         }
>>>>> -
>>>>> -    return sz;
>>>>>     }
>>>>> /*********************************************************
>>>>> @@ -651,91 +657,112 @@ ssize_t recvfrom_connectionless (int sd,
>>>>> *********************************************************/
>>>>>     static uint32_t mdtm_process_recv_events(void)  {
>>>>> +    enum {
>>>>> +        FD_DSOCK = 0,
>>>>> +        FD_BSRSOCK,
>>>>> +        FD_TMRFD,
>>>>> +        NUM_FDS
>>>>> +    };
>>>>> +
>>>>>         /*
>>>>>            STEP 1: Poll on the BSRsock and Dsock to get the events
>>>>>            if data is received process the received data
>>>>>            if discovery events are received , process the discovery
>>>>> events
>>>>>          */
>>>>>     -    while (1) {
>>>>> +    while (true) {
>>>>>             unsigned int pollres;
>>>>> -        struct pollfd pfd[3];
>>>>> +        struct pollfd pfd[NUM_FDS] = {{0}};
>>>>>             struct tipc_event event;
>>>>>     -        pfd[0].fd = tipc_cb.Dsock;
>>>>> -        pfd[0].events = POLLIN;
>>>>> -        pfd[1].fd = tipc_cb.BSRsock;
>>>>> -        pfd[1].events = POLLIN;
>>>>> -        pfd[2].fd = tipc_cb.tmr_fd;
>>>>> -        pfd[2].events = POLLIN;
>>>>> -
>>>>> -        pfd[0].revents = pfd[1].revents = pfd[2].revents = 0;
>>>>> -
>>>>> -        pollres = poll(pfd, 3, MDTM_TIPC_POLL_TIMEOUT);
>>>>> +        pfd[FD_DSOCK].fd = tipc_cb.Dsock;
>>>>> +        pfd[FD_DSOCK].events = POLLIN;
>>>>> +        pfd[FD_BSRSOCK].fd = tipc_cb.BSRsock;
>>>>> +        pfd[FD_BSRSOCK].events = POLLIN;
>>>>> +        pfd[FD_TMRFD].fd = tipc_cb.tmr_fd;
>>>>> +        pfd[FD_TMRFD].events = POLLIN;
>>>>> +
>>>>> +        pfd[FD_DSOCK].revents = pfd[FD_BSRSOCK].revents =
>>>>> +pfd[FD_TMRFD].revents = 0;
>>>>> +
>>>>> +        pollres = poll(pfd, NUM_FDS, MDTM_TIPC_POLL_TIMEOUT);
>>>>>                if (pollres > 0) {    /* Check for EINTR and discard */
>>>>>                 memset(&event, 0, sizeof(event));
>>>>> osaf_mutex_lock_ordie(&gl_mds_library_mutex);
>>>>> -            if (pfd[0].revents == POLLIN) {
>>>>> -                if (recv(tipc_cb.Dsock, &event, sizeof(event), 0)
>>>>> != sizeof(event)) {
>>>>> -                    m_MDS_LOG_ERR("Unable to capture the recd
>>>>> event .. Continuing  err :%s", strerror(errno));
>>>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>>>> -                    continue;
>>>>> -                } else {
>>>>> -                    if (NTOHL(event.event) == TIPC_PUBLISHED) {
>>>>> -
>>>>> -                        m_MDS_LOG_INFO("MDTM: Published: ");
>>>>> -                        m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>>>> <0x%08x:%u>\n",
>>>>> -                                   NTOHL(event.s.seq.type),
>>>>> NTOHL(event.found_lower),
>>>>> - NTOHL(event.found_upper), NTOHL(event.port.node),
>>>>> -                                   NTOHL(event.port.ref));
>>>>> -
>>>>> -                        if ( NCSCC_RC_SUCCESS !=
>>>>> mdtm_process_discovery_events(TIPC_PUBLISHED,
>>>>> -                                event)) {
>>>>> -                            m_MDS_LOG_INFO("MDTM: Published Event
>>>>> processing status: F");
>>>>> +            if (pfd[FD_DSOCK].revents == POLLIN) {
>>>>> +                while(true) {
>>>>> +                    ssize_t rc = recv(tipc_cb.Dsock, &event,
>>>>> sizeof(event), MSG_DONTWAIT);
>>>>> +                    if (rc == -1) {
>>>>> +                        if (errno == EAGAIN || errno ==
>>>>> EWOULDBLOCK) {
>>>>> +                            break;
>>>>> +                        } else if (errno == EINTR) {
>>>>> +                            continue;
>>>>> +                        } else {
>>>>> +                            m_MDS_LOG_ERR("Unable to capture the
>>>>> recd event .. Continuing  err :%s", strerror(errno));
>>>>> +                            break;
>>>>>                             }
>>>>> -                    } else if (NTOHL(event.event) ==
>>>>> TIPC_WITHDRAWN) {
>>>>> -                        m_MDS_LOG_INFO("MDTM: Withdrawn: ");
>>>>> -                        m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>>>> <0x%08x:%u>\n",
>>>>> -                                   NTOHL(event.s.seq.type),
>>>>> NTOHL(event.found_lower),
>>>>> - NTOHL(event.found_upper), NTOHL(event.port.node),
>>>>> -                                   NTOHL(event.port.ref));
>>>>> -
>>>>> -                        if ( NCSCC_RC_SUCCESS !=
>>>>> mdtm_process_discovery_events(TIPC_WITHDRAWN,
>>>>> -                                event)) {
>>>>> -                            m_MDS_LOG_INFO("MDTM: Withdrawn event
>>>>> processing status: F");
>>>>> +                    } else {
>>>>> +                        if (rc != sizeof(event)) {
>>>>> +                            m_MDS_LOG_ERR("Unable to capture the
>>>>> recd event .. Continuing  err :%s", strerror(errno));
>>>>> +                            break;
>>>>> +                        } else {
>>>>> +                            if (NTOHL(event.event) ==
>>>>> TIPC_PUBLISHED) {
>>>>> +
>>>>> +                                m_MDS_LOG_INFO("MDTM: Published: ");
>>>>> +                                m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>>>> port id <0x%08x:%u>\n",
>>>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>>>> + NTOHL(event.port.ref));
>>>>> +
>>>>> +                                if ( NCSCC_RC_SUCCESS !=
>>>>> mdtm_process_discovery_events(TIPC_PUBLISHED,
>>>>> +
>>>>> event)) {
>>>>> +                                    m_MDS_LOG_INFO("MDTM:
>>>>> Published Event processing status: F");
>>>>> +                                }
>>>>> +                            } else if (NTOHL(event.event) ==
>>>>> TIPC_WITHDRAWN) {
>>>>> +                                m_MDS_LOG_INFO("MDTM: Withdrawn: ");
>>>>> +                                m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>>>> port id <0x%08x:%u>\n",
>>>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>>>> + NTOHL(event.port.ref));
>>>>> +
>>>>> +                                if ( NCSCC_RC_SUCCESS !=
>>>>> mdtm_process_discovery_events(TIPC_WITHDRAWN,
>>>>> +
>>>>> event)) {
>>>>> +                                    m_MDS_LOG_INFO("MDTM:
>>>>> Withdrawn event processing status: F");
>>>>> +                                }
>>>>> +                            } else if (NTOHL(event.event) ==
>>>>> TIPC_SUBSCR_TIMEOUT) {
>>>>> +                                /* As the timeout passed in
>>>>> infinite, No need to check for the Timeout */
>>>>> +                                m_MDS_LOG_ERR("MDTM: Timeou Event");
>>>>> +                                m_MDS_LOG_INFO("MDTM: Timeou Event");
>>>>> +                                m_MDS_LOG_INFO("MDTM: <%u,%u,%u>
>>>>> port id <0x%08x:%u>\n",
>>>>> + NTOHL(event.s.seq.type), NTOHL(event.found_lower),
>>>>> + NTOHL(event.found_upper), NTOHL(event.port.node),
>>>>> + NTOHL(event.port.ref));
>>>>> +                            } else {
>>>>> +                                m_MDS_LOG_ERR("MDTM: Unknown Event");
>>>>> +                                /* This should never come */
>>>>> +                                assert(0);
>>>>> +                            }
>>>>>                             }
>>>>> -                    } else if (NTOHL(event.event) ==
>>>>> TIPC_SUBSCR_TIMEOUT) {
>>>>> -                        /* As the timeout passed in infinite, No
>>>>> need to check for the Timeout */
>>>>> -                        m_MDS_LOG_ERR("MDTM: Timeou Event");
>>>>> -                        m_MDS_LOG_INFO("MDTM: Timeou Event");
>>>>> -                        m_MDS_LOG_INFO("MDTM: <%u,%u,%u> port id
>>>>> <0x%08x:%u>\n",
>>>>> -                                   NTOHL(event.s.seq.type),
>>>>> NTOHL(event.found_lower),
>>>>> - NTOHL(event.found_upper), NTOHL(event.port.node),
>>>>> -                                   NTOHL(event.port.ref));
>>>>> -                    } else {
>>>>> -                        m_MDS_LOG_ERR("MDTM: Unknown Event");
>>>>> -                        /* This should never come */
>>>>> -                        assert(0);
>>>>>                         }
>>>>>                     }
>>>>> -            } else if (pfd[0].revents & POLLHUP) {
>>>>> +            }
>>>>> +            if (pfd[FD_DSOCK].revents & POLLHUP) {
>>>>>                     /* This value is returned when the number of
>>>>> subscriptions made cross the tipc max_subscr limit,
>>>>>                        so no more connection to the tipc topserver
>>>>> is present(viz no more up/down events),
>>>>>                        so abort and exit the process */
>>>>> -                   m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on
>>>>> Discovery Socket, No. of subscriptions=%d",
>>>>> -                     num_subscriptions);
>>>>> +                m_MDS_LOG_CRITICAL("MDTM: POLLHUP returned on
>>>>> Discovery Socket, No. of subscriptions=%d",
>>>>> +                           num_subscriptions);
>>>>>                     abort();    /* This means, the process is use
>>>>> less as
>>>>>                                it has lost the connectivity with the
>>>>> topology server
>>>>>                                and will not be able to receive any
>>>>> UP/DOWN events */
>>>>>     -            } else if (pfd[1].revents & POLLIN) {
>>>>> +            }
>>>>> +            if (pfd[FD_BSRSOCK].revents & POLLIN) {
>>>>>                        /* Data Received */
>>>>>                        uint8_t *inbuf = tipc_cb.recvbuf;
>>>>>                     uint8_t *data;    /* Used for decoding */
>>>>> -                uint16_t recd_bytes = 0;
>>>>>     #ifdef MDS_CHECKSUM_ENABLE_FLAG
>>>>>                     uint16_t old_checksum = 0;
>>>>>                     uint16_t new_checksum = 0; @@ -743,78 +770,83 @@
>>>>> static uint32_t mdtm_process_recv_events
>>>>>                     struct sockaddr_tipc client_addr;
>>>>>                     socklen_t alen = sizeof(client_addr);
>>>>>     -                uint16_t recd_buf_len = 0;
>>>>>                     m_MDS_LOG_INFO("MDTM: Data received: Processing
>>>>> data ");
>>>>>     -                recd_bytes =
>>>>> recvfrom_connectionless(tipc_cb.BSRsock, inbuf,
>>>>> TIPC_MAX_USER_MSG_SIZE, 0,
>>>>> -                        (struct sockaddr *)&client_addr, &alen);
>>>>> -                if (recd_bytes == 0) {
>>>>> -                    m_MDS_LOG_DBG("MDTM: recd bytes=0 on received
>>>>> on sock, abnormal/unknown/hack  condition. Ignoring");
>>>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>>>> -                    continue;
>>>>> +                while (true) {
>>>>> +                    uint16_t recd_buf_len = 0;
>>>>> +                    ssize_t recd_bytes =
>>>>> recvfrom_connectionless(tipc_cb.BSRsock, inbuf,
>>>>> TIPC_MAX_USER_MSG_SIZE, MSG_DONTWAIT,
>>>>> +                                         (struct sockaddr
>>>>> *)&client_addr, &alen);
>>>>> +                    if (recd_bytes == -1) {
>>>>> +                        m_MDS_LOG_DBG("MDTM: no more data to read");
>>>>> +                        break;
>>>>> +                    }
>>>>> +                    if (recd_bytes == 0) {
>>>>> +                        m_MDS_LOG_DBG("MDTM: recd bytes=0 on
>>>>> received on sock, abnormal/unknown/hack  condition. Ignoring");
>>>>> +                        break;
>>>>> +                    }
>>>>> +                    data = inbuf;
>>>>> +
>>>>> +                    recd_buf_len = ncs_decode_16bit(&data);
>>>>> +
>>>>> +                    if (pfd[FD_BSRSOCK].revents & POLLERR) {
>>>>> +                        m_MDS_LOG_ERR("MDTM: Error
>>>>> Recd:tipc_id=<0x%08x:%u>:errno=0x%08x",
>>>>> +                                  client_addr.addr.id.node,
>>>>> client_addr.addr.id.ref, errno);
>>>>> +                    } else if (recd_buf_len == recd_bytes) {
>>>>> +                        uint64_t tipc_id = 0;
>>>>> +                        uint32_t buff_dump = 0;
>>>>> +                        tipc_id =
>>>>> ((uint64_t)client_addr.addr.id.node) << 32;    /*
>>>>> TIPC_ID=<NODE,REF> */
>>>>> +                        tipc_id |= client_addr.addr.id.ref;
>>>>> +
>>>>> +#ifdef MDS_CHECKSUM_ENABLE_FLAG
>>>>> +                        if (inbuf[2] == 1) {
>>>>> +                            old_checksum = ((uint16_t)inbuf[3] <<
>>>>> 8 | inbuf[4]);
>>>>> +                            inbuf[3] = 0;
>>>>> +                            inbuf[4] = 0;
>>>>> +                            new_checksum =
>>>>> mds_checksum(recd_bytes, inbuf);
>>>>> +
>>>>> +                            if (old_checksum != new_checksum) {
>>>>> +                                m_MDS_LOG_ERR
>>>>> + ("CHECKSUM-MISMATCH:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest =
>>>>> <%08x,%u>",
>>>>> +                                     recd_bytes, tipc_id,
>>>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>>> + id.node),
>>>>> + client_addr.addr.id.ref);
>>>>> +                                mds_buff_dump(inbuf, recd_bytes,
>>>>> 100);
>>>>> +                                abort();
>>>>> +                                break;
>>>>> +                            }
>>>>> + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>>> &buff_dump);
>>>>> +                            if (buff_dump) {
>>>>> +                                m_MDS_LOG_ERR
>>>>> + ("RECV_DATA_PROCESS:recvd_on_sock=%zd, Tipc_id=0x%llu, Adest =
>>>>> <%08x,%u>",
>>>>> +                                     recd_bytes, tipc_id,
>>>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>>> + id.node),
>>>>> + client_addr.addr.id.ref);
>>>>> +                                mds_buff_dump(inbuf, recd_bytes,
>>>>> 100);
>>>>> +                            }
>>>>> +                        } else {
>>>>> + mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>>> &buff_dump);
>>>>> +                        }
>>>>> +#else
>>>>> + mdtm_process_recv_data(&inbuf[2], recd_bytes - 2, tipc_id,
>>>>> +&buff_dump); #endif
>>>>> +                    } else {
>>>>> +                        uint64_t tipc_id;
>>>>> +                        tipc_id =
>>>>> ((uint64_t)client_addr.addr.id.node) << 32;    /*
>>>>> TIPC_ID=<NODE,REF> */
>>>>> +                        tipc_id |= client_addr.addr.id.ref;
>>>>> +
>>>>> +                        /* Log message that we are dropping the
>>>>> data */
>>>>> +                        m_MDS_LOG_ERR
>>>>> + ("LEN-MISMATCH:recvd_on_sock=%zd, size_in_mds_hdr=%d, Tipc_id=
>>>>> %"PRIu64", Adest = <%08x,%u>",
>>>>> +                             recd_bytes, recd_buf_len, tipc_id,
>>>>> + m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node),
>>>>> +                             client_addr.addr.id.ref);
>>>>> +                        mds_buff_dump(inbuf, recd_bytes, 100);
>>>>> +                    }
>>>>>                     }
>>>>> -                data = inbuf;
>>>>> -
>>>>> -                recd_buf_len = ncs_decode_16bit(&data);
>>>>> -
>>>>> -                if (pfd[1].revents & POLLERR) {
>>>>> -                    m_MDS_LOG_ERR("MDTM: Error
>>>>> Recd:tipc_id=<0x%08x:%u>:errno=0x%08x",
>>>>> -                              client_addr.addr.id.node,
>>>>> client_addr.addr.id.ref, errno);
>>>>> -                } else if (recd_buf_len == recd_bytes) {
>>>>> -                    uint64_t tipc_id = 0;
>>>>> -                    uint32_t buff_dump = 0;
>>>>> -                    tipc_id = ((uint64_t)client_addr.addr.id.node)
>>>>> << 32;    /* TIPC_ID=<NODE,REF> */
>>>>> -                    tipc_id |= client_addr.addr.id.ref;
>>>>> -
>>>>> -#ifdef MDS_CHECKSUM_ENABLE_FLAG
>>>>> -                    if (inbuf[2] == 1) {
>>>>> -                        old_checksum = ((uint16_t)inbuf[3] << 8 |
>>>>> inbuf[4]);
>>>>> -                        inbuf[3] = 0;
>>>>> -                        inbuf[4] = 0;
>>>>> -                        new_checksum = mds_checksum(recd_bytes,
>>>>> inbuf);
>>>>> -
>>>>> -                        if (old_checksum != new_checksum) {
>>>>> -                            m_MDS_LOG_ERR
>>>>> - ("CHECKSUM-MISMATCH:recvd_on_sock=%d, Tipc_id=0x%llu, Adest =
>>>>> <%08x,%u>",
>>>>> -                                 recd_bytes, tipc_id,
>>>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>>> - id.node),
>>>>> -                                 client_addr.addr.id.ref);
>>>>> -                            mds_buff_dump(inbuf, recd_bytes, 100);
>>>>> -                            abort();
>>>>> - osaf_mutex_unlock_ordie(&gl_mds_library_mutex);
>>>>> -                            continue;
>>>>> -                        }
>>>>> - mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>>> &buff_dump);
>>>>> -                        if (buff_dump) {
>>>>> -                            m_MDS_LOG_ERR
>>>>> - ("RECV_DATA_PROCESS:recvd_on_sock=%d, Tipc_id=0x%llu, Adest =
>>>>> <%08x,%u>",
>>>>> -                                 recd_bytes, tipc_id,
>>>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.
>>>>> - id.node),
>>>>> -                                 client_addr.addr.id.ref);
>>>>> -                            mds_buff_dump(inbuf, recd_bytes, 100);
>>>>> -                        }
>>>>> -                    } else {
>>>>> - mdtm_process_recv_data(&inbuf[5], recd_bytes - 5, tipc_id,
>>>>> &buff_dump);
>>>>> -                    }
>>>>> -#else
>>>>> -                    mdtm_process_recv_data(&inbuf[2], recd_bytes -
>>>>> 2, tipc_id, &buff_dump);
>>>>> -#endif
>>>>> -                } else {
>>>>> -                    uint64_t tipc_id;
>>>>> -                    tipc_id = ((uint64_t)client_addr.addr.id.node)
>>>>> << 32;    /* TIPC_ID=<NODE,REF> */
>>>>> -                    tipc_id |= client_addr.addr.id.ref;
>>>>> -
>>>>> -                    /* Log message that we are dropping the data */
>>>>> -                    m_MDS_LOG_ERR
>>>>> -                        ("LEN-MISMATCH:recvd_on_sock=%d,
>>>>> size_in_mds_hdr=%d,  Tipc_id= %"PRIu64", Adest = <%08x,%u>",
>>>>> -                         recd_bytes, recd_buf_len, tipc_id,
>>>>> - m_MDS_GET_NCS_NODE_ID_FROM_TIPC_NODE_ID(client_addr.addr.id.node),
>>>>> -                         client_addr.addr.id.ref);
>>>>> -                    mds_buff_dump(inbuf, recd_bytes, 100);
>>>>> -                }
>>>>> -            } else if (pfd[2].revents == POLLIN) {
>>>>> +            }
>>>>> +            if (pfd[FD_TMRFD].revents == POLLIN) {
>>>>>                     m_MDS_LOG_INFO("MDTM: Processing Timer mailbox
>>>>> events\n");
>>>>>                        /* Check if destroy-event has been processed
>>>>> */
>>>>>
>>>>> -------------------------------------------------------------------
>>>>> ---
>>>>> -------- The Command Line: Reinvented for Modern Developers Did the
>>>>> resurgence of CLI tooling catch you by surprise?
>>>>> Reconnect with the command line and become more productive.
>>>>> Learn the new .NET and ASP.NET CLI. Get your free copy!
>>>>> http://sdm.link/telerik
>>>>> _______________________________________________
>>>>> Opensaf-devel mailing list
>>>>> Opensaf-devel@lists.sourceforge.net
>>>>> https://lists.sourceforge.net/lists/listinfo/opensaf-devel


------------------------------------------------------------------------------
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to