Found today while wondering why the EOR records take multiple minutes to
be processed by bgpd on a full view.
Found the problem in a commit that is 7.5 years old which should not
monopolise the SE by a single session. Since not all data is processed
done by a single read we end up depending on additional incoming data to
process the remaining data in the buffer.
Split the read part from the data processing and process all peers in the
poll loop after reading in new data. With this the EOR shows up without
needing 4 or so KEEPALIVES trickling in.

OK?
-- 
:wq Claudio

Index: session.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/session.c,v
retrieving revision 1.321
diff -u -p -r1.321 session.c
--- session.c   12 Apr 2012 17:26:09 -0000      1.321
+++ session.c   8 Jul 2012 18:22:53 -0000
@@ -78,6 +78,7 @@ void  session_notification(struct peer *,
            ssize_t);
 void   session_rrefresh(struct peer *, u_int8_t);
 int    session_dispatch_msg(struct pollfd *, struct peer *);
+int    session_process_msg(struct peer *);
 int    parse_header(struct peer *, u_char *, u_int16_t *, u_int8_t *);
 int    parse_open(struct peer *);
 int    parse_update(struct peer *);
@@ -448,6 +449,9 @@ session_main(int pipe_m2s[2], int pipe_s
                        events = POLLIN;
                        if (p->wbuf.queued > 0 || p->state == STATE_CONNECT)
                                events |= POLLOUT;
+                       /* is there still work to do? */
+                       if (p->rbuf->wpos)
+                               timeout = 0;
 
                        /* poll events */
                        if (p->fd != -1 && events != 0) {
@@ -548,6 +552,10 @@ session_main(int pipe_m2s[2], int pipe_s
                        nfds -= session_dispatch_msg(&pfd[j],
                            peer_l[j - idx_listeners]);
 
+               for (p = peers; p != NULL; p = p->next)
+                       if (p->rbuf && p->rbuf->wpos)
+                               session_process_msg(p);
+
                for (; nfds > 0 && j < idx_mrts; j++)
                        if (pfd[j].revents & POLLOUT) {
                                nfds--;
@@ -1575,11 +1583,9 @@ session_rrefresh(struct peer *p, u_int8_
 int
 session_dispatch_msg(struct pollfd *pfd, struct peer *p)
 {
-       ssize_t         n, rpos, av, left;
+       ssize_t         n;
        socklen_t       len;
-       int             error, processed = 0;
-       u_int16_t       msglen;
-       u_int8_t        msgtype;
+       int             error;
 
        if (p->state == STATE_CONNECT) {
                if (pfd->revents & POLLOUT) {
@@ -1649,71 +1655,83 @@ session_dispatch_msg(struct pollfd *pfd,
                        return (1);
                }
 
-               rpos = 0;
-               av = p->rbuf->wpos + n;
+               p->rbuf->wpos += n;
                p->stats.last_read = time(NULL);
+               return (1);
+       }
+       return (0);
+}
 
-               /*
-                * session might drop to IDLE -> buffers deallocated
-                * we MUST check rbuf != NULL before use
-                */
-               for (;;) {
-                       if (rpos + MSGSIZE_HEADER > av)
-                               break;
-                       if (p->rbuf == NULL)
-                               break;
-                       if (parse_header(p, p->rbuf->buf + rpos, &msglen,
-                           &msgtype) == -1)
-                               return (0);
-                       if (rpos + msglen > av)
-                               break;
-                       p->rbuf->rptr = p->rbuf->buf + rpos;
+int
+session_process_msg(struct peer *p)
+{
+       ssize_t         rpos, av, left;
+       int             processed = 0;
+       u_int16_t       msglen;
+       u_int8_t        msgtype;
 
-                       switch (msgtype) {
-                       case OPEN:
-                               bgp_fsm(p, EVNT_RCVD_OPEN);
-                               p->stats.msg_rcvd_open++;
-                               break;
-                       case UPDATE:
-                               bgp_fsm(p, EVNT_RCVD_UPDATE);
-                               p->stats.msg_rcvd_update++;
-                               break;
-                       case NOTIFICATION:
-                               bgp_fsm(p, EVNT_RCVD_NOTIFICATION);
-                               p->stats.msg_rcvd_notification++;
-                               break;
-                       case KEEPALIVE:
-                               bgp_fsm(p, EVNT_RCVD_KEEPALIVE);
-                               p->stats.msg_rcvd_keepalive++;
-                               break;
-                       case RREFRESH:
-                               parse_refresh(p);
-                               p->stats.msg_rcvd_rrefresh++;
-                               break;
-                       default:        /* cannot happen */
-                               session_notification(p, ERR_HEADER,
-                                   ERR_HDR_TYPE, &msgtype, 1);
-                               log_warnx("received message with "
-                                   "unknown type %u", msgtype);
-                               bgp_fsm(p, EVNT_CON_FATAL);
-                       }
-                       rpos += msglen;
-                       if (++processed > MSG_PROCESS_LIMIT)
-                               break;
-               }
+       rpos = 0;
+       av = p->rbuf->wpos;
+
+       /*
+        * session might drop to IDLE -> buffers deallocated
+        * we MUST check rbuf != NULL before use
+        */
+       for (;;) {
+               if (rpos + MSGSIZE_HEADER > av)
+                       break;
                if (p->rbuf == NULL)
-                       return (1);
+                       break;
+               if (parse_header(p, p->rbuf->buf + rpos, &msglen,
+                   &msgtype) == -1)
+                       return (0);
+               if (rpos + msglen > av)
+                       break;
+               p->rbuf->rptr = p->rbuf->buf + rpos;
+
+               switch (msgtype) {
+               case OPEN:
+                       bgp_fsm(p, EVNT_RCVD_OPEN);
+                       p->stats.msg_rcvd_open++;
+                       break;
+               case UPDATE:
+                       bgp_fsm(p, EVNT_RCVD_UPDATE);
+                       p->stats.msg_rcvd_update++;
+                       break;
+               case NOTIFICATION:
+                       bgp_fsm(p, EVNT_RCVD_NOTIFICATION);
+                       p->stats.msg_rcvd_notification++;
+                       break;
+               case KEEPALIVE:
+                       bgp_fsm(p, EVNT_RCVD_KEEPALIVE);
+                       p->stats.msg_rcvd_keepalive++;
+                       break;
+               case RREFRESH:
+                       parse_refresh(p);
+                       p->stats.msg_rcvd_rrefresh++;
+                       break;
+               default:        /* cannot happen */
+                       session_notification(p, ERR_HEADER, ERR_HDR_TYPE,
+                           &msgtype, 1);
+                       log_warnx("received message with unknown type %u",
+                           msgtype);
+                       bgp_fsm(p, EVNT_CON_FATAL);
+               }
+               rpos += msglen;
+               if (++processed > MSG_PROCESS_LIMIT)
+                       break;
+       }
+       if (p->rbuf == NULL)
+               return (1);
 
-               if (rpos < av) {
-                       left = av - rpos;
-                       memcpy(&p->rbuf->buf, p->rbuf->buf + rpos, left);
-                       p->rbuf->wpos = left;
-               } else
-                       p->rbuf->wpos = 0;
+       if (rpos < av) {
+               left = av - rpos;
+               memcpy(&p->rbuf->buf, p->rbuf->buf + rpos, left);
+               p->rbuf->wpos = left;
+       } else
+               p->rbuf->wpos = 0;
 
-               return (1);
-       }
-       return (0);
+       return (1);
 }
 
 int

Reply via email to