On Mon, Dec 30, 2019 at 06:35:28PM +0100, Claudio Jeker wrote:
> The imsg processing in the RDE is sometimes a bit unfair. The problem is
> that peers sending many UPDATES starve out the others especially on
> intial table dumps. This comes from the fact that imsg are processed to
> completion and so the system sending more just gets more CPU.
>
> This diff changes this and moves peer specific imsgs to a per peer imsg
> queue where messages are enqueued and later dequeued one at a time.
> This results in a quicker main poll loop since less work is done per read
> operation. Also on startup peers finish their initial sync relative to
> their table size. Additionally a big peer flapping will have less
> influence at the processing speed of other updates or withdraws.
>
> This can be further optimized but lets keep the diffs small for a change
> :)
OK denis@
> --
> :wq Claudio
>
> Index: Makefile
> ===================================================================
> RCS file: /cvs/src/usr.sbin/bgpd/Makefile,v
> retrieving revision 1.35
> diff -u -p -r1.35 Makefile
> --- Makefile 17 Jun 2019 11:02:19 -0000 1.35
> +++ Makefile 30 Dec 2019 12:30:18 -0000
> @@ -5,7 +5,7 @@ SRCS= bgpd.c session.c log.c logmsg.c pa
> rde.c rde_rib.c rde_decide.c rde_prefix.c mrt.c kroute.c control.c \
> pfkey.c rde_update.c rde_attr.c rde_community.c printconf.c \
> rde_filter.c rde_sets.c rde_trie.c pftable.c name2id.c \
> - util.c carp.c timer.c
> + util.c carp.c timer.c rde_peer.c
> CFLAGS+= -Wall -I${.CURDIR}
> CFLAGS+= -Wstrict-prototypes -Wmissing-prototypes
> CFLAGS+= -Wmissing-declarations
> Index: rde.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
> retrieving revision 1.493
> diff -u -p -r1.493 rde.c
> --- rde.c 16 Dec 2019 10:35:02 -0000 1.493
> +++ rde.c 30 Dec 2019 14:01:58 -0000
> @@ -49,6 +49,7 @@
> void rde_sighdlr(int);
> void rde_dispatch_imsg_session(struct imsgbuf *);
> void rde_dispatch_imsg_parent(struct imsgbuf *);
> +void rde_dispatch_imsg_peer(struct rde_peer *, void *);
> void rde_update_dispatch(struct rde_peer *, struct imsg *);
> int rde_update_update(struct rde_peer *, struct filterstate *,
> struct bgpd_addr *, u_int8_t);
> @@ -95,6 +96,8 @@ u_int8_t rde_roa_validity(struct rde_pr
>
> void peer_init(u_int32_t);
> void peer_shutdown(void);
> +void peer_foreach(void (*)(struct rde_peer *, void *), void *);
> +int peer_imsg_pending(void);
> int peer_localaddrs(struct rde_peer *, struct bgpd_addr *);
> struct rde_peer *peer_match(struct ctl_neighbor *, u_int32_t);
> struct rde_peer *peer_add(u_int32_t, struct peer_config *);
> @@ -266,7 +269,7 @@ rde_main(int debug, int verbose)
> }
>
> if (rib_dump_pending() || rde_update_queue_pending() ||
> - nexthop_pending())
> + nexthop_pending() || peer_imsg_pending())
> timeout = 0;
>
> if (poll(pfd, i, timeout) == -1) {
> @@ -305,6 +308,7 @@ rde_main(int debug, int verbose)
> mctx = LIST_NEXT(mctx, entry);
> }
>
> + peer_foreach(rde_dispatch_imsg_peer, NULL);
> rib_dump_runner();
> nexthop_runner();
> if (ibuf_se && ibuf_se->w.queued < SESS_MSG_HIGH_MARK) {
> @@ -358,7 +362,6 @@ rde_dispatch_imsg_session(struct imsgbuf
> struct imsg imsg;
> struct peer p;
> struct peer_config pconf;
> - struct session_up sup;
> struct ctl_show_rib csr;
> struct ctl_show_rib_request req;
> struct rde_peer *peer;
> @@ -370,7 +373,6 @@ rde_dispatch_imsg_session(struct imsgbuf
> size_t aslen;
> int verbose;
> u_int16_t len;
> - u_int8_t aid;
>
> while (ibuf) {
> if ((n = imsg_get(ibuf, &imsg)) == -1)
> @@ -380,106 +382,24 @@ rde_dispatch_imsg_session(struct imsgbuf
>
> switch (imsg.hdr.type) {
> case IMSG_UPDATE:
> - if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> - log_warnx("rde_dispatch: unknown peer id %d",
> - imsg.hdr.peerid);
> - break;
> - }
> - rde_update_dispatch(peer, &imsg);
> - break;
> - case IMSG_SESSION_ADD:
> - if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
> - fatalx("incorrect size of session request");
> - memcpy(&pconf, imsg.data, sizeof(pconf));
> - peer_add(imsg.hdr.peerid, &pconf);
> - break;
> case IMSG_SESSION_UP:
> - if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
> - fatalx("incorrect size of session request");
> - memcpy(&sup, imsg.data, sizeof(sup));
> - if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> - log_warnx("rde_dispatch: unknown peer id %d",
> - imsg.hdr.peerid);
> - break;
> - }
> - peer_up(peer, &sup);
> - break;
> case IMSG_SESSION_DOWN:
> - if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> - log_warnx("rde_dispatch: unknown peer id %d",
> - imsg.hdr.peerid);
> - break;
> - }
> - peer_down(peer);
> - break;
> case IMSG_SESSION_STALE:
> - if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> - log_warnx("rde_dispatch: wrong imsg len");
> - break;
> - }
> - memcpy(&aid, imsg.data, sizeof(aid));
> - if (aid >= AID_MAX) {
> - log_warnx("IMSG_SESSION_STALE: bad AID");
> - break;
> - }
> - if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> - log_warnx("rde_dispatch: unknown peer id %d",
> - imsg.hdr.peerid);
> - break;
> - }
> - peer_stale(peer, aid);
> - break;
> case IMSG_SESSION_FLUSH:
> - if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> - log_warnx("rde_dispatch: wrong imsg len");
> - break;
> - }
> - memcpy(&aid, imsg.data, sizeof(aid));
> - if (aid >= AID_MAX) {
> - log_warnx("IMSG_SESSION_FLUSH: bad AID");
> - break;
> - }
> - if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> - log_warnx("rde_dispatch: unknown peer id %d",
> - imsg.hdr.peerid);
> - break;
> - }
> - peer_flush(peer, aid, peer->staletime[aid]);
> - break;
> case IMSG_SESSION_RESTARTED:
> - if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> - log_warnx("rde_dispatch: wrong imsg len");
> - break;
> - }
> - memcpy(&aid, imsg.data, sizeof(aid));
> - if (aid >= AID_MAX) {
> - log_warnx("IMSG_SESSION_RESTARTED: bad AID");
> - break;
> - }
> - if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> - log_warnx("rde_dispatch: unknown peer id %d",
> - imsg.hdr.peerid);
> - break;
> - }
> - if (peer->staletime[aid])
> - peer_flush(peer, aid, peer->staletime[aid]);
> - break;
> case IMSG_REFRESH:
> - if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> - log_warnx("rde_dispatch: wrong imsg len");
> - break;
> - }
> - memcpy(&aid, imsg.data, sizeof(aid));
> - if (aid >= AID_MAX) {
> - log_warnx("IMSG_REFRESH: bad AID");
> - break;
> - }
> if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
> log_warnx("rde_dispatch: unknown peer id %d",
> imsg.hdr.peerid);
> break;
> }
> - peer_dump(peer, aid);
> + peer_imsg_push(peer, &imsg);
> + break;
> + case IMSG_SESSION_ADD:
> + if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
> + fatalx("incorrect size of session request");
> + memcpy(&pconf, imsg.data, sizeof(pconf));
> + peer_add(imsg.hdr.peerid, &pconf);
> break;
> case IMSG_NETWORK_ADD:
> if (imsg.hdr.len - IMSG_HEADER_SIZE !=
> @@ -1054,6 +974,68 @@ rde_dispatch_imsg_parent(struct imsgbuf
> }
> }
>
> +void
> +rde_dispatch_imsg_peer(struct rde_peer *peer, void *bula)
> +{
> + struct session_up sup;
> + struct imsg imsg;
> + u_int8_t aid;
> +
> + if (!peer_imsg_pop(peer, &imsg))
> + return;
> +
> + switch (imsg.hdr.type) {
> + case IMSG_UPDATE:
> + rde_update_dispatch(peer, &imsg);
> + break;
> + case IMSG_SESSION_UP:
> + if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
> + fatalx("incorrect size of session request");
> + memcpy(&sup, imsg.data, sizeof(sup));
> + peer_up(peer, &sup);
> + break;
> + case IMSG_SESSION_DOWN:
> + peer_down(peer);
> + break;
> + case IMSG_SESSION_STALE:
> + case IMSG_SESSION_FLUSH:
> + case IMSG_SESSION_RESTARTED:
> + case IMSG_REFRESH:
> + if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
> + log_warnx("%s: wrong imsg len", __func__);
> + break;
> + }
> + memcpy(&aid, imsg.data, sizeof(aid));
> + if (aid >= AID_MAX) {
> + log_warnx("%s: bad AID", __func__);
> + break;
> + }
> +
> + switch (imsg.hdr.type) {
> + case IMSG_SESSION_STALE:
> + peer_stale(peer, aid);
> + break;
> + case IMSG_SESSION_FLUSH:
> + peer_flush(peer, aid, peer->staletime[aid]);
> + break;
> + case IMSG_SESSION_RESTARTED:
> + if (peer->staletime[aid])
> + peer_flush(peer, aid, peer->staletime[aid]);
> + break;
> + case IMSG_REFRESH:
> + peer_dump(peer, aid);
> + break;
> + }
> + break;
> + default:
> + log_warnx("%s: unhandled imsg type %d", __func__,
> + imsg.hdr.type);
> + break;
> + }
> +
> + imsg_free(&imsg);
> +}
> +
> /* handle routing updates from the session engine. */
> void
> rde_update_dispatch(struct rde_peer *peer, struct imsg *imsg)
> @@ -3585,6 +3567,30 @@ peer_shutdown(void)
> free(peertable.peer_hashtbl);
> }
>
> +/*
> + * Traverse all peers calling callback for each peer.
> + */
> +void
> +peer_foreach(void (*callback)(struct rde_peer *, void *), void *arg)
> +{
> + struct rde_peer *peer, *np;
> + u_int32_t i;
> +
> + for (i = 0; i <= peertable.peer_hashmask; i++)
> + LIST_FOREACH_SAFE(peer, &peertable.peer_hashtbl[i], hash_l, np)
> + callback(peer, arg);
> +}
> +
> +int
> +peer_imsg_pending(void)
> +{
> + int pending = 0;
> +
> + peer_foreach(peer_imsg_queued, &pending);
> +
> + return pending;
> +}
> +
> struct rde_peer *
> peer_get(u_int32_t id)
> {
> @@ -3650,6 +3656,7 @@ peer_add(u_int32_t id, struct peer_confi
> if (peer->loc_rib_id == RIB_NOTFOUND)
> fatalx("King Bula's new peer met an unknown RIB");
> peer->state = PEER_NONE;
> + SIMPLEQ_INIT(&peer->imsg_queue);
>
> head = PEER_HASH(id);
>
> @@ -3786,8 +3793,9 @@ peer_down(struct rde_peer *peer)
> fatal("%s: prefix_dump_new", __func__);
>
> peer_flush(peer, AID_UNSPEC, 0);
> -
> peer->prefix_cnt = 0;
> +
> + peer_imsg_flush(peer);
>
> LIST_REMOVE(peer, hash_l);
> LIST_REMOVE(peer, peer_l);
> Index: rde.h
> ===================================================================
> RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
> retrieving revision 1.227
> diff -u -p -r1.227 rde.h
> --- rde.h 30 Oct 2019 05:27:50 -0000 1.227
> +++ rde.h 30 Dec 2019 14:01:00 -0000
> @@ -76,10 +76,12 @@ LIST_HEAD(attr_list, attr);
> LIST_HEAD(aspath_head, rde_aspath);
> RB_HEAD(prefix_tree, prefix);
> RB_HEAD(prefix_index, prefix);
> +struct iq;
>
> struct rde_peer {
> LIST_ENTRY(rde_peer) hash_l; /* hash list over all peers */
> LIST_ENTRY(rde_peer) peer_l; /* list of all peers */
> + SIMPLEQ_HEAD(, iq) imsg_queue;
> struct peer_config conf;
> struct bgpd_addr remote_addr;
> struct bgpd_addr local_v4_addr;
> @@ -368,6 +370,13 @@ u_int32_t rde_local_as(void);
> int rde_decisionflags(void);
> int rde_as4byte(struct rde_peer *);
> struct rde_peer *peer_get(u_int32_t);
> +
> +/* rde_peer.c */
> +void peer_imsg_push(struct rde_peer *, struct imsg *);
> +int peer_imsg_pop(struct rde_peer *, struct imsg *);
> +void peer_imsg_queued(struct rde_peer *, void *);
> +void peer_imsg_flush(struct rde_peer *);
> +
>
> /* rde_attr.c */
> int attr_write(void *, u_int16_t, u_int8_t, u_int8_t, void *,
> Index: rde_peer.c
> ===================================================================
> RCS file: rde_peer.c
> diff -N rde_peer.c
> --- /dev/null 1 Jan 1970 00:00:00 -0000
> +++ rde_peer.c 30 Dec 2019 14:03:23 -0000
> @@ -0,0 +1,100 @@
> +/* $OpenBSD$ */
> +
> +/*
> + * Copyright (c) 2019 Claudio Jeker <[email protected]>
> + *
> + * Permission to use, copy, modify, and distribute this software for any
> + * purpose with or without fee is hereby granted, provided that the above
> + * copyright notice and this permission notice appear in all copies.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
> + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
> + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
> + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
> + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
> + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
> + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
> + */
> +#include <sys/types.h>
> +#include <sys/queue.h>
> +
> +#include <netinet/in.h>
> +
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <string.h>
> +
> +#include "bgpd.h"
> +#include "rde.h"
> +
> +struct iq {
> + SIMPLEQ_ENTRY(iq) entry;
> + struct imsg imsg;
> +};
> +
> +/*
> + * move an imsg from src to dst, disconnecting any dynamic memory from src.
> + */
> +static void
> +imsg_move(struct imsg *dst, struct imsg *src)
> +{
> + *dst = *src;
> + src->data = NULL; /* allocation was moved */
> +}
> +
> +/*
> + * push an imsg onto the peer imsg queue.
> + */
> +void
> +peer_imsg_push(struct rde_peer *peer, struct imsg *imsg)
> +{
> + struct iq *iq;
> +
> + if ((iq = calloc(1, sizeof(*iq))) == NULL)
> + fatal(NULL);
> + imsg_move(&iq->imsg, imsg);
> + SIMPLEQ_INSERT_TAIL(&peer->imsg_queue, iq, entry);
> +}
> +
> +/*
> + * pop first imsg from peer imsg queue and move it into imsg argument.
> + * Returns 1 if an element is returned else 0.
> + */
> +int
> +peer_imsg_pop(struct rde_peer *peer, struct imsg *imsg)
> +{
> + struct iq *iq;
> +
> + iq = SIMPLEQ_FIRST(&peer->imsg_queue);
> + if (iq == NULL)
> + return 0;
> +
> + imsg_move(imsg, &iq->imsg);
> +
> + SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
> + free(iq);
> +
> + return 1;
> +}
> +
> +void
> +peer_imsg_queued(struct rde_peer *peer, void *arg)
> +{
> + int *p = arg;
> +
> + *p = *p || !SIMPLEQ_EMPTY(&peer->imsg_queue);
> +}
> +
> +/*
> + * flush all imsg queued for a peer.
> + */
> +void
> +peer_imsg_flush(struct rde_peer *peer)
> +{
> + struct iq *iq;
> +
> + while ((iq = SIMPLEQ_FIRST(&peer->imsg_queue)) != NULL) {
> + SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
> + free(iq);
> + }
> +}
>