Found today while wondering why the EOR records take multiple minutes to be processed by bgpd on a full view. Found the problem in a commit that is 7.5 years old which should not monopolise the SE by a single session. Since not all data is processed done by a single read we end up depending on additional incoming data to process the remaining data in the buffer. Split the read part from the data processing and process all peers in the poll loop after reading in new data. With this the EOR shows up without needing 4 or so KEEPALIVES trickling in.
OK? -- :wq Claudio Index: session.c =================================================================== RCS file: /cvs/src/usr.sbin/bgpd/session.c,v retrieving revision 1.321 diff -u -p -r1.321 session.c --- session.c 12 Apr 2012 17:26:09 -0000 1.321 +++ session.c 8 Jul 2012 18:22:53 -0000 @@ -78,6 +78,7 @@ void session_notification(struct peer *, ssize_t); void session_rrefresh(struct peer *, u_int8_t); int session_dispatch_msg(struct pollfd *, struct peer *); +int session_process_msg(struct peer *); int parse_header(struct peer *, u_char *, u_int16_t *, u_int8_t *); int parse_open(struct peer *); int parse_update(struct peer *); @@ -448,6 +449,9 @@ session_main(int pipe_m2s[2], int pipe_s events = POLLIN; if (p->wbuf.queued > 0 || p->state == STATE_CONNECT) events |= POLLOUT; + /* is there still work to do? */ + if (p->rbuf->wpos) + timeout = 0; /* poll events */ if (p->fd != -1 && events != 0) { @@ -548,6 +552,10 @@ session_main(int pipe_m2s[2], int pipe_s nfds -= session_dispatch_msg(&pfd[j], peer_l[j - idx_listeners]); + for (p = peers; p != NULL; p = p->next) + if (p->rbuf && p->rbuf->wpos) + session_process_msg(p); + for (; nfds > 0 && j < idx_mrts; j++) if (pfd[j].revents & POLLOUT) { nfds--; @@ -1575,11 +1583,9 @@ session_rrefresh(struct peer *p, u_int8_ int session_dispatch_msg(struct pollfd *pfd, struct peer *p) { - ssize_t n, rpos, av, left; + ssize_t n; socklen_t len; - int error, processed = 0; - u_int16_t msglen; - u_int8_t msgtype; + int error; if (p->state == STATE_CONNECT) { if (pfd->revents & POLLOUT) { @@ -1649,71 +1655,83 @@ session_dispatch_msg(struct pollfd *pfd, return (1); } - rpos = 0; - av = p->rbuf->wpos + n; + p->rbuf->wpos += n; p->stats.last_read = time(NULL); + return (1); + } + return (0); +} - /* - * session might drop to IDLE -> buffers deallocated - * we MUST check rbuf != NULL before use - */ - for (;;) { - if (rpos + MSGSIZE_HEADER > av) - break; - if (p->rbuf == NULL) - break; - if (parse_header(p, p->rbuf->buf + rpos, &msglen, - &msgtype) == -1) - return (0); - if (rpos + msglen > av) - break; - p->rbuf->rptr = p->rbuf->buf + rpos; +int +session_process_msg(struct peer *p) +{ + ssize_t rpos, av, left; + int processed = 0; + u_int16_t msglen; + u_int8_t msgtype; - switch (msgtype) { - case OPEN: - bgp_fsm(p, EVNT_RCVD_OPEN); - p->stats.msg_rcvd_open++; - break; - case UPDATE: - bgp_fsm(p, EVNT_RCVD_UPDATE); - p->stats.msg_rcvd_update++; - break; - case NOTIFICATION: - bgp_fsm(p, EVNT_RCVD_NOTIFICATION); - p->stats.msg_rcvd_notification++; - break; - case KEEPALIVE: - bgp_fsm(p, EVNT_RCVD_KEEPALIVE); - p->stats.msg_rcvd_keepalive++; - break; - case RREFRESH: - parse_refresh(p); - p->stats.msg_rcvd_rrefresh++; - break; - default: /* cannot happen */ - session_notification(p, ERR_HEADER, - ERR_HDR_TYPE, &msgtype, 1); - log_warnx("received message with " - "unknown type %u", msgtype); - bgp_fsm(p, EVNT_CON_FATAL); - } - rpos += msglen; - if (++processed > MSG_PROCESS_LIMIT) - break; - } + rpos = 0; + av = p->rbuf->wpos; + + /* + * session might drop to IDLE -> buffers deallocated + * we MUST check rbuf != NULL before use + */ + for (;;) { + if (rpos + MSGSIZE_HEADER > av) + break; if (p->rbuf == NULL) - return (1); + break; + if (parse_header(p, p->rbuf->buf + rpos, &msglen, + &msgtype) == -1) + return (0); + if (rpos + msglen > av) + break; + p->rbuf->rptr = p->rbuf->buf + rpos; + + switch (msgtype) { + case OPEN: + bgp_fsm(p, EVNT_RCVD_OPEN); + p->stats.msg_rcvd_open++; + break; + case UPDATE: + bgp_fsm(p, EVNT_RCVD_UPDATE); + p->stats.msg_rcvd_update++; + break; + case NOTIFICATION: + bgp_fsm(p, EVNT_RCVD_NOTIFICATION); + p->stats.msg_rcvd_notification++; + break; + case KEEPALIVE: + bgp_fsm(p, EVNT_RCVD_KEEPALIVE); + p->stats.msg_rcvd_keepalive++; + break; + case RREFRESH: + parse_refresh(p); + p->stats.msg_rcvd_rrefresh++; + break; + default: /* cannot happen */ + session_notification(p, ERR_HEADER, ERR_HDR_TYPE, + &msgtype, 1); + log_warnx("received message with unknown type %u", + msgtype); + bgp_fsm(p, EVNT_CON_FATAL); + } + rpos += msglen; + if (++processed > MSG_PROCESS_LIMIT) + break; + } + if (p->rbuf == NULL) + return (1); - if (rpos < av) { - left = av - rpos; - memcpy(&p->rbuf->buf, p->rbuf->buf + rpos, left); - p->rbuf->wpos = left; - } else - p->rbuf->wpos = 0; + if (rpos < av) { + left = av - rpos; + memcpy(&p->rbuf->buf, p->rbuf->buf + rpos, left); + p->rbuf->wpos = left; + } else + p->rbuf->wpos = 0; - return (1); - } - return (0); + return (1); } int