On Mon, Dec 30, 2019 at 06:35:28PM +0100, Claudio Jeker wrote:
> The imsg processing in the RDE is sometimes a bit unfair. The problem is
> that peers sending many UPDATES starve out the others especially on
> intial table dumps. This comes from the fact that imsg are processed to
> completion and so the system sending more just gets more CPU.
> 
> This diff changes this and moves peer specific imsgs to a per peer imsg
> queue where messages are enqueued and later dequeued one at a time.
> This results in a quicker main poll loop since less work is done per read
> operation. Also on startup peers finish their initial sync relative to
> their table size. Additionally a big peer flapping will have less
> influence at the processing speed of other updates or withdraws.
> 
> This can be further optimized but lets keep the diffs small for a change
> :)

OK denis@

> -- 
> :wq Claudio
> 
> Index: Makefile
> ===================================================================
> RCS file: /cvs/src/usr.sbin/bgpd/Makefile,v
> retrieving revision 1.35
> diff -u -p -r1.35 Makefile
> --- Makefile  17 Jun 2019 11:02:19 -0000      1.35
> +++ Makefile  30 Dec 2019 12:30:18 -0000
> @@ -5,7 +5,7 @@ SRCS= bgpd.c session.c log.c logmsg.c pa
>       rde.c rde_rib.c rde_decide.c rde_prefix.c mrt.c kroute.c control.c \
>       pfkey.c rde_update.c rde_attr.c rde_community.c printconf.c \
>       rde_filter.c rde_sets.c rde_trie.c pftable.c name2id.c \
> -     util.c carp.c timer.c
> +     util.c carp.c timer.c rde_peer.c
>  CFLAGS+= -Wall -I${.CURDIR}
>  CFLAGS+= -Wstrict-prototypes -Wmissing-prototypes
>  CFLAGS+= -Wmissing-declarations
> Index: rde.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
> retrieving revision 1.493
> diff -u -p -r1.493 rde.c
> --- rde.c     16 Dec 2019 10:35:02 -0000      1.493
> +++ rde.c     30 Dec 2019 14:01:58 -0000
> @@ -49,6 +49,7 @@
>  void          rde_sighdlr(int);
>  void          rde_dispatch_imsg_session(struct imsgbuf *);
>  void          rde_dispatch_imsg_parent(struct imsgbuf *);
> +void          rde_dispatch_imsg_peer(struct rde_peer *, void *);
>  void          rde_update_dispatch(struct rde_peer *, struct imsg *);
>  int           rde_update_update(struct rde_peer *, struct filterstate *,
>                    struct bgpd_addr *, u_int8_t);
> @@ -95,6 +96,8 @@ u_int8_t     rde_roa_validity(struct rde_pr
>  
>  void          peer_init(u_int32_t);
>  void          peer_shutdown(void);
> +void          peer_foreach(void (*)(struct rde_peer *, void *), void *);
> +int           peer_imsg_pending(void);
>  int           peer_localaddrs(struct rde_peer *, struct bgpd_addr *);
>  struct rde_peer *peer_match(struct ctl_neighbor *, u_int32_t);
>  struct rde_peer      *peer_add(u_int32_t, struct peer_config *);
> @@ -266,7 +269,7 @@ rde_main(int debug, int verbose)
>               }
>  
>               if (rib_dump_pending() || rde_update_queue_pending() ||
> -                 nexthop_pending())
> +                 nexthop_pending() || peer_imsg_pending())
>                       timeout = 0;
>  
>               if (poll(pfd, i, timeout) == -1) {
> @@ -305,6 +308,7 @@ rde_main(int debug, int verbose)
>                       mctx = LIST_NEXT(mctx, entry);
>               }
>  
> +             peer_foreach(rde_dispatch_imsg_peer, NULL);
>               rib_dump_runner();
>               nexthop_runner();
>               if (ibuf_se && ibuf_se->w.queued < SESS_MSG_HIGH_MARK) {
> @@ -358,7 +362,6 @@ rde_dispatch_imsg_session(struct imsgbuf
>       struct imsg              imsg;
>       struct peer              p;
>       struct peer_config       pconf;
> -     struct session_up        sup;
>       struct ctl_show_rib      csr;
>       struct ctl_show_rib_request     req;
>       struct rde_peer         *peer;
> @@ -370,7 +373,6 @@ rde_dispatch_imsg_session(struct imsgbuf
>       size_t                   aslen;
>       int                      verbose;
>       u_int16_t                len;
> -     u_int8_t                 aid;
>  
>       while (ibuf) {
>               if ((n = imsg_get(ibuf, &imsg)) == -1)
> @@ -380,106 +382,24 @@ rde_dispatch_imsg_session(struct imsgbuf
>  
>               switch (imsg.hdr.type) {
>               case IMSG_UPDATE:
> -                     if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> -                             log_warnx("rde_dispatch: unknown peer id %d",
> -                                 imsg.hdr.peerid);
> -                             break;
> -                     }
> -                     rde_update_dispatch(peer, &imsg);
> -                     break;
> -             case IMSG_SESSION_ADD:
> -                     if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
> -                             fatalx("incorrect size of session request");
> -                     memcpy(&pconf, imsg.data, sizeof(pconf));
> -                     peer_add(imsg.hdr.peerid, &pconf);
> -                     break;
>               case IMSG_SESSION_UP:
> -                     if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
> -                             fatalx("incorrect size of session request");
> -                     memcpy(&sup, imsg.data, sizeof(sup));
> -                     if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> -                             log_warnx("rde_dispatch: unknown peer id %d",
> -                                 imsg.hdr.peerid);
> -                             break;
> -                     }
> -                     peer_up(peer, &sup);
> -                     break;
>               case IMSG_SESSION_DOWN:
> -                     if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> -                             log_warnx("rde_dispatch: unknown peer id %d",
> -                                 imsg.hdr.peerid);
> -                             break;
> -                     }
> -                     peer_down(peer);
> -                     break;
>               case IMSG_SESSION_STALE:
> -                     if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> -                             log_warnx("rde_dispatch: wrong imsg len");
> -                             break;
> -                     }
> -                     memcpy(&aid, imsg.data, sizeof(aid));
> -                     if (aid >= AID_MAX) {
> -                             log_warnx("IMSG_SESSION_STALE: bad AID");
> -                             break;
> -                     }
> -                     if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> -                             log_warnx("rde_dispatch: unknown peer id %d",
> -                                 imsg.hdr.peerid);
> -                             break;
> -                     }
> -                     peer_stale(peer, aid);
> -                     break;
>               case IMSG_SESSION_FLUSH:
> -                     if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> -                             log_warnx("rde_dispatch: wrong imsg len");
> -                             break;
> -                     }
> -                     memcpy(&aid, imsg.data, sizeof(aid));
> -                     if (aid >= AID_MAX) {
> -                             log_warnx("IMSG_SESSION_FLUSH: bad AID");
> -                             break;
> -                     }
> -                     if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> -                             log_warnx("rde_dispatch: unknown peer id %d",
> -                                 imsg.hdr.peerid);
> -                             break;
> -                     }
> -                     peer_flush(peer, aid, peer->staletime[aid]);
> -                     break;
>               case IMSG_SESSION_RESTARTED:
> -                     if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> -                             log_warnx("rde_dispatch: wrong imsg len");
> -                             break;
> -                     }
> -                     memcpy(&aid, imsg.data, sizeof(aid));
> -                     if (aid >= AID_MAX) {
> -                             log_warnx("IMSG_SESSION_RESTARTED: bad AID");
> -                             break;
> -                     }
> -                     if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> -                             log_warnx("rde_dispatch: unknown peer id %d",
> -                                 imsg.hdr.peerid);
> -                             break;
> -                     }
> -                     if (peer->staletime[aid])
> -                             peer_flush(peer, aid, peer->staletime[aid]);
> -                     break;
>               case IMSG_REFRESH:
> -                     if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> -                             log_warnx("rde_dispatch: wrong imsg len");
> -                             break;
> -                     }
> -                     memcpy(&aid, imsg.data, sizeof(aid));
> -                     if (aid >= AID_MAX) {
> -                             log_warnx("IMSG_REFRESH: bad AID");
> -                             break;
> -                     }
>                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
>                               log_warnx("rde_dispatch: unknown peer id %d",
>                                   imsg.hdr.peerid);
>                               break;
>                       }
> -                     peer_dump(peer, aid);
> +                     peer_imsg_push(peer, &imsg);
> +                     break;
> +             case IMSG_SESSION_ADD:
> +                     if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
> +                             fatalx("incorrect size of session request");
> +                     memcpy(&pconf, imsg.data, sizeof(pconf));
> +                     peer_add(imsg.hdr.peerid, &pconf);
>                       break;
>               case IMSG_NETWORK_ADD:
>                       if (imsg.hdr.len - IMSG_HEADER_SIZE !=
> @@ -1054,6 +974,68 @@ rde_dispatch_imsg_parent(struct imsgbuf 
>       }
>  }
>  
> +void
> +rde_dispatch_imsg_peer(struct rde_peer *peer, void *bula)
> +{
> +     struct session_up sup;
> +     struct imsg imsg;
> +     u_int8_t aid;
> +
> +     if (!peer_imsg_pop(peer, &imsg))
> +             return;
> +
> +     switch (imsg.hdr.type) {
> +     case IMSG_UPDATE:
> +             rde_update_dispatch(peer, &imsg);
> +             break;
> +     case IMSG_SESSION_UP:
> +             if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
> +                     fatalx("incorrect size of session request");
> +             memcpy(&sup, imsg.data, sizeof(sup));
> +             peer_up(peer, &sup);
> +             break;
> +     case IMSG_SESSION_DOWN:
> +             peer_down(peer);
> +             break;
> +     case IMSG_SESSION_STALE:
> +     case IMSG_SESSION_FLUSH:
> +     case IMSG_SESSION_RESTARTED:
> +     case IMSG_REFRESH:
> +             if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> +                     log_warnx("%s: wrong imsg len", __func__);
> +                     break;
> +             }
> +             memcpy(&aid, imsg.data, sizeof(aid));
> +             if (aid >= AID_MAX) {
> +                     log_warnx("%s: bad AID", __func__);
> +                     break;
> +             }
> +
> +             switch (imsg.hdr.type) {
> +             case IMSG_SESSION_STALE:
> +                     peer_stale(peer, aid);
> +                     break;
> +             case IMSG_SESSION_FLUSH:
> +                     peer_flush(peer, aid, peer->staletime[aid]);
> +                     break;
> +             case IMSG_SESSION_RESTARTED:
> +                     if (peer->staletime[aid])
> +                             peer_flush(peer, aid, peer->staletime[aid]);
> +                     break;
> +             case IMSG_REFRESH:
> +                     peer_dump(peer, aid);
> +                     break;
> +             }
> +             break;
> +     default:
> +             log_warnx("%s: unhandled imsg type %d", __func__,
> +                 imsg.hdr.type);
> +             break;
> +     }
> +
> +     imsg_free(&imsg);
> +}
> +
>  /* handle routing updates from the session engine. */
>  void
>  rde_update_dispatch(struct rde_peer *peer, struct imsg *imsg)
> @@ -3585,6 +3567,30 @@ peer_shutdown(void)
>       free(peertable.peer_hashtbl);
>  }
>  
> +/*
> + * Traverse all peers calling callback for each peer.
> + */
> +void
> +peer_foreach(void (*callback)(struct rde_peer *, void *), void *arg)
> +{
> +     struct rde_peer *peer, *np;
> +     u_int32_t i;
> +
> +     for (i = 0; i <= peertable.peer_hashmask; i++)
> +             LIST_FOREACH_SAFE(peer,  &peertable.peer_hashtbl[i], hash_l, np)
> +                     callback(peer, arg);
> +}
> +
> +int
> +peer_imsg_pending(void)
> +{
> +     int pending = 0;
> +
> +     peer_foreach(peer_imsg_queued, &pending);
> +
> +     return pending;
> +}
> +
>  struct rde_peer *
>  peer_get(u_int32_t id)
>  {
> @@ -3650,6 +3656,7 @@ peer_add(u_int32_t id, struct peer_confi
>       if (peer->loc_rib_id == RIB_NOTFOUND)
>               fatalx("King Bula's new peer met an unknown RIB");
>       peer->state = PEER_NONE;
> +     SIMPLEQ_INIT(&peer->imsg_queue);
>  
>       head = PEER_HASH(id);
>  
> @@ -3786,8 +3793,9 @@ peer_down(struct rde_peer *peer)
>               fatal("%s: prefix_dump_new", __func__);
>  
>       peer_flush(peer, AID_UNSPEC, 0);
> -
>       peer->prefix_cnt = 0;
> +
> +     peer_imsg_flush(peer);
>  
>       LIST_REMOVE(peer, hash_l);
>       LIST_REMOVE(peer, peer_l);
> Index: rde.h
> ===================================================================
> RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
> retrieving revision 1.227
> diff -u -p -r1.227 rde.h
> --- rde.h     30 Oct 2019 05:27:50 -0000      1.227
> +++ rde.h     30 Dec 2019 14:01:00 -0000
> @@ -76,10 +76,12 @@ LIST_HEAD(attr_list, attr);
>  LIST_HEAD(aspath_head, rde_aspath);
>  RB_HEAD(prefix_tree, prefix);
>  RB_HEAD(prefix_index, prefix);
> +struct iq;
>  
>  struct rde_peer {
>       LIST_ENTRY(rde_peer)             hash_l; /* hash list over all peers */
>       LIST_ENTRY(rde_peer)             peer_l; /* list of all peers */
> +     SIMPLEQ_HEAD(, iq)               imsg_queue;
>       struct peer_config               conf;
>       struct bgpd_addr                 remote_addr;
>       struct bgpd_addr                 local_v4_addr;
> @@ -368,6 +370,13 @@ u_int32_t        rde_local_as(void);
>  int          rde_decisionflags(void);
>  int          rde_as4byte(struct rde_peer *);
>  struct rde_peer      *peer_get(u_int32_t);
> +
> +/* rde_peer.c */
> +void          peer_imsg_push(struct rde_peer *, struct imsg *);
> +int           peer_imsg_pop(struct rde_peer *, struct imsg *);
> +void          peer_imsg_queued(struct rde_peer *, void *);
> +void          peer_imsg_flush(struct rde_peer *);
> +
>  
>  /* rde_attr.c */
>  int           attr_write(void *, u_int16_t, u_int8_t, u_int8_t, void *,
> Index: rde_peer.c
> ===================================================================
> RCS file: rde_peer.c
> diff -N rde_peer.c
> --- /dev/null 1 Jan 1970 00:00:00 -0000
> +++ rde_peer.c        30 Dec 2019 14:03:23 -0000
> @@ -0,0 +1,100 @@
> +/*   $OpenBSD$ */
> +
> +/*
> + * Copyright (c) 2019 Claudio Jeker <[email protected]>
> + *
> + * Permission to use, copy, modify, and distribute this software for any
> + * purpose with or without fee is hereby granted, provided that the above
> + * copyright notice and this permission notice appear in all copies.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
> + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
> + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
> + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
> + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
> + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
> + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
> + */
> +#include <sys/types.h>
> +#include <sys/queue.h>
> +
> +#include <netinet/in.h>
> +
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <string.h>
> +
> +#include "bgpd.h"
> +#include "rde.h"
> +
> +struct iq {
> +     SIMPLEQ_ENTRY(iq)       entry;
> +     struct imsg             imsg;
> +};
> +
> +/*
> + * move an imsg from src to dst, disconnecting any dynamic memory from src.
> + */
> +static void
> +imsg_move(struct imsg *dst, struct imsg *src)
> +{
> +     *dst = *src;
> +     src->data = NULL;       /* allocation was moved */
> +}
> +
> +/*
> + * push an imsg onto the peer imsg queue.
> + */
> +void
> +peer_imsg_push(struct rde_peer *peer, struct imsg *imsg)
> +{
> +     struct iq *iq;
> +
> +     if ((iq = calloc(1, sizeof(*iq))) == NULL)
> +             fatal(NULL);
> +     imsg_move(&iq->imsg, imsg);
> +     SIMPLEQ_INSERT_TAIL(&peer->imsg_queue, iq, entry);
> +}
> +
> +/*
> + * pop first imsg from peer imsg queue and move it into imsg argument.
> + * Returns 1 if an element is returned else 0.
> + */
> +int
> +peer_imsg_pop(struct rde_peer *peer, struct imsg *imsg)
> +{
> +     struct iq *iq;
> +
> +     iq = SIMPLEQ_FIRST(&peer->imsg_queue);
> +     if (iq == NULL)
> +             return 0;
> +
> +     imsg_move(imsg, &iq->imsg);
> +
> +     SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
> +     free(iq);
> +
> +     return 1;
> +}
> +
> +void
> +peer_imsg_queued(struct rde_peer *peer, void *arg)
> +{
> +     int *p = arg;
> +
> +     *p = *p || !SIMPLEQ_EMPTY(&peer->imsg_queue);
> +}
> +
> +/*
> + * flush all imsg queued for a peer.
> + */
> +void
> +peer_imsg_flush(struct rde_peer *peer)
> +{
> +     struct iq *iq;
> +
> +     while ((iq = SIMPLEQ_FIRST(&peer->imsg_queue)) != NULL) {
> +             SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
> +             free(iq);
> +     }
> +}
> 

Reply via email to