The imsg processing in the RDE is sometimes a bit unfair. The problem is
that peers sending many UPDATES starve out the others especially on
intial table dumps. This comes from the fact that imsg are processed to
completion and so the system sending more just gets more CPU.

This diff changes this and moves peer specific imsgs to a per peer imsg
queue where messages are enqueued and later dequeued one at a time.
This results in a quicker main poll loop since less work is done per read
operation. Also on startup peers finish their initial sync relative to
their table size. Additionally a big peer flapping will have less
influence at the processing speed of other updates or withdraws.

This can be further optimized but lets keep the diffs small for a change
:)
-- 
:wq Claudio

Index: Makefile
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/Makefile,v
retrieving revision 1.35
diff -u -p -r1.35 Makefile
--- Makefile    17 Jun 2019 11:02:19 -0000      1.35
+++ Makefile    30 Dec 2019 12:30:18 -0000
@@ -5,7 +5,7 @@ SRCS=   bgpd.c session.c log.c logmsg.c pa
        rde.c rde_rib.c rde_decide.c rde_prefix.c mrt.c kroute.c control.c \
        pfkey.c rde_update.c rde_attr.c rde_community.c printconf.c \
        rde_filter.c rde_sets.c rde_trie.c pftable.c name2id.c \
-       util.c carp.c timer.c
+       util.c carp.c timer.c rde_peer.c
 CFLAGS+= -Wall -I${.CURDIR}
 CFLAGS+= -Wstrict-prototypes -Wmissing-prototypes
 CFLAGS+= -Wmissing-declarations
Index: rde.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
retrieving revision 1.493
diff -u -p -r1.493 rde.c
--- rde.c       16 Dec 2019 10:35:02 -0000      1.493
+++ rde.c       30 Dec 2019 14:01:58 -0000
@@ -49,6 +49,7 @@
 void            rde_sighdlr(int);
 void            rde_dispatch_imsg_session(struct imsgbuf *);
 void            rde_dispatch_imsg_parent(struct imsgbuf *);
+void            rde_dispatch_imsg_peer(struct rde_peer *, void *);
 void            rde_update_dispatch(struct rde_peer *, struct imsg *);
 int             rde_update_update(struct rde_peer *, struct filterstate *,
                     struct bgpd_addr *, u_int8_t);
@@ -95,6 +96,8 @@ u_int8_t       rde_roa_validity(struct rde_pr
 
 void            peer_init(u_int32_t);
 void            peer_shutdown(void);
+void            peer_foreach(void (*)(struct rde_peer *, void *), void *);
+int             peer_imsg_pending(void);
 int             peer_localaddrs(struct rde_peer *, struct bgpd_addr *);
 struct rde_peer *peer_match(struct ctl_neighbor *, u_int32_t);
 struct rde_peer        *peer_add(u_int32_t, struct peer_config *);
@@ -266,7 +269,7 @@ rde_main(int debug, int verbose)
                }
 
                if (rib_dump_pending() || rde_update_queue_pending() ||
-                   nexthop_pending())
+                   nexthop_pending() || peer_imsg_pending())
                        timeout = 0;
 
                if (poll(pfd, i, timeout) == -1) {
@@ -305,6 +308,7 @@ rde_main(int debug, int verbose)
                        mctx = LIST_NEXT(mctx, entry);
                }
 
+               peer_foreach(rde_dispatch_imsg_peer, NULL);
                rib_dump_runner();
                nexthop_runner();
                if (ibuf_se && ibuf_se->w.queued < SESS_MSG_HIGH_MARK) {
@@ -358,7 +362,6 @@ rde_dispatch_imsg_session(struct imsgbuf
        struct imsg              imsg;
        struct peer              p;
        struct peer_config       pconf;
-       struct session_up        sup;
        struct ctl_show_rib      csr;
        struct ctl_show_rib_request     req;
        struct rde_peer         *peer;
@@ -370,7 +373,6 @@ rde_dispatch_imsg_session(struct imsgbuf
        size_t                   aslen;
        int                      verbose;
        u_int16_t                len;
-       u_int8_t                 aid;
 
        while (ibuf) {
                if ((n = imsg_get(ibuf, &imsg)) == -1)
@@ -380,106 +382,24 @@ rde_dispatch_imsg_session(struct imsgbuf
 
                switch (imsg.hdr.type) {
                case IMSG_UPDATE:
-                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
-                               log_warnx("rde_dispatch: unknown peer id %d",
-                                   imsg.hdr.peerid);
-                               break;
-                       }
-                       rde_update_dispatch(peer, &imsg);
-                       break;
-               case IMSG_SESSION_ADD:
-                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
-                               fatalx("incorrect size of session request");
-                       memcpy(&pconf, imsg.data, sizeof(pconf));
-                       peer_add(imsg.hdr.peerid, &pconf);
-                       break;
                case IMSG_SESSION_UP:
-                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
-                               fatalx("incorrect size of session request");
-                       memcpy(&sup, imsg.data, sizeof(sup));
-                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
-                               log_warnx("rde_dispatch: unknown peer id %d",
-                                   imsg.hdr.peerid);
-                               break;
-                       }
-                       peer_up(peer, &sup);
-                       break;
                case IMSG_SESSION_DOWN:
-                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
-                               log_warnx("rde_dispatch: unknown peer id %d",
-                                   imsg.hdr.peerid);
-                               break;
-                       }
-                       peer_down(peer);
-                       break;
                case IMSG_SESSION_STALE:
-                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
-                               log_warnx("rde_dispatch: wrong imsg len");
-                               break;
-                       }
-                       memcpy(&aid, imsg.data, sizeof(aid));
-                       if (aid >= AID_MAX) {
-                               log_warnx("IMSG_SESSION_STALE: bad AID");
-                               break;
-                       }
-                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
-                               log_warnx("rde_dispatch: unknown peer id %d",
-                                   imsg.hdr.peerid);
-                               break;
-                       }
-                       peer_stale(peer, aid);
-                       break;
                case IMSG_SESSION_FLUSH:
-                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
-                               log_warnx("rde_dispatch: wrong imsg len");
-                               break;
-                       }
-                       memcpy(&aid, imsg.data, sizeof(aid));
-                       if (aid >= AID_MAX) {
-                               log_warnx("IMSG_SESSION_FLUSH: bad AID");
-                               break;
-                       }
-                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
-                               log_warnx("rde_dispatch: unknown peer id %d",
-                                   imsg.hdr.peerid);
-                               break;
-                       }
-                       peer_flush(peer, aid, peer->staletime[aid]);
-                       break;
                case IMSG_SESSION_RESTARTED:
-                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
-                               log_warnx("rde_dispatch: wrong imsg len");
-                               break;
-                       }
-                       memcpy(&aid, imsg.data, sizeof(aid));
-                       if (aid >= AID_MAX) {
-                               log_warnx("IMSG_SESSION_RESTARTED: bad AID");
-                               break;
-                       }
-                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
-                               log_warnx("rde_dispatch: unknown peer id %d",
-                                   imsg.hdr.peerid);
-                               break;
-                       }
-                       if (peer->staletime[aid])
-                               peer_flush(peer, aid, peer->staletime[aid]);
-                       break;
                case IMSG_REFRESH:
-                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
-                               log_warnx("rde_dispatch: wrong imsg len");
-                               break;
-                       }
-                       memcpy(&aid, imsg.data, sizeof(aid));
-                       if (aid >= AID_MAX) {
-                               log_warnx("IMSG_REFRESH: bad AID");
-                               break;
-                       }
                        if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
                                log_warnx("rde_dispatch: unknown peer id %d",
                                    imsg.hdr.peerid);
                                break;
                        }
-                       peer_dump(peer, aid);
+                       peer_imsg_push(peer, &imsg);
+                       break;
+               case IMSG_SESSION_ADD:
+                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
+                               fatalx("incorrect size of session request");
+                       memcpy(&pconf, imsg.data, sizeof(pconf));
+                       peer_add(imsg.hdr.peerid, &pconf);
                        break;
                case IMSG_NETWORK_ADD:
                        if (imsg.hdr.len - IMSG_HEADER_SIZE !=
@@ -1054,6 +974,68 @@ rde_dispatch_imsg_parent(struct imsgbuf 
        }
 }
 
+void
+rde_dispatch_imsg_peer(struct rde_peer *peer, void *bula)
+{
+       struct session_up sup;
+       struct imsg imsg;
+       u_int8_t aid;
+
+       if (!peer_imsg_pop(peer, &imsg))
+               return;
+
+       switch (imsg.hdr.type) {
+       case IMSG_UPDATE:
+               rde_update_dispatch(peer, &imsg);
+               break;
+       case IMSG_SESSION_UP:
+               if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
+                       fatalx("incorrect size of session request");
+               memcpy(&sup, imsg.data, sizeof(sup));
+               peer_up(peer, &sup);
+               break;
+       case IMSG_SESSION_DOWN:
+               peer_down(peer);
+               break;
+       case IMSG_SESSION_STALE:
+       case IMSG_SESSION_FLUSH:
+       case IMSG_SESSION_RESTARTED:
+       case IMSG_REFRESH:
+               if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
+                       log_warnx("%s: wrong imsg len", __func__);
+                       break;
+               }
+               memcpy(&aid, imsg.data, sizeof(aid));
+               if (aid >= AID_MAX) {
+                       log_warnx("%s: bad AID", __func__);
+                       break;
+               }
+
+               switch (imsg.hdr.type) {
+               case IMSG_SESSION_STALE:
+                       peer_stale(peer, aid);
+                       break;
+               case IMSG_SESSION_FLUSH:
+                       peer_flush(peer, aid, peer->staletime[aid]);
+                       break;
+               case IMSG_SESSION_RESTARTED:
+                       if (peer->staletime[aid])
+                               peer_flush(peer, aid, peer->staletime[aid]);
+                       break;
+               case IMSG_REFRESH:
+                       peer_dump(peer, aid);
+                       break;
+               }
+               break;
+       default:
+               log_warnx("%s: unhandled imsg type %d", __func__,
+                   imsg.hdr.type);
+               break;
+       }
+
+       imsg_free(&imsg);
+}
+
 /* handle routing updates from the session engine. */
 void
 rde_update_dispatch(struct rde_peer *peer, struct imsg *imsg)
@@ -3585,6 +3567,30 @@ peer_shutdown(void)
        free(peertable.peer_hashtbl);
 }
 
+/*
+ * Traverse all peers calling callback for each peer.
+ */
+void
+peer_foreach(void (*callback)(struct rde_peer *, void *), void *arg)
+{
+       struct rde_peer *peer, *np;
+       u_int32_t i;
+
+       for (i = 0; i <= peertable.peer_hashmask; i++)
+               LIST_FOREACH_SAFE(peer,  &peertable.peer_hashtbl[i], hash_l, np)
+                       callback(peer, arg);
+}
+
+int
+peer_imsg_pending(void)
+{
+       int pending = 0;
+
+       peer_foreach(peer_imsg_queued, &pending);
+
+       return pending;
+}
+
 struct rde_peer *
 peer_get(u_int32_t id)
 {
@@ -3650,6 +3656,7 @@ peer_add(u_int32_t id, struct peer_confi
        if (peer->loc_rib_id == RIB_NOTFOUND)
                fatalx("King Bula's new peer met an unknown RIB");
        peer->state = PEER_NONE;
+       SIMPLEQ_INIT(&peer->imsg_queue);
 
        head = PEER_HASH(id);
 
@@ -3786,8 +3793,9 @@ peer_down(struct rde_peer *peer)
                fatal("%s: prefix_dump_new", __func__);
 
        peer_flush(peer, AID_UNSPEC, 0);
-
        peer->prefix_cnt = 0;
+
+       peer_imsg_flush(peer);
 
        LIST_REMOVE(peer, hash_l);
        LIST_REMOVE(peer, peer_l);
Index: rde.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
retrieving revision 1.227
diff -u -p -r1.227 rde.h
--- rde.h       30 Oct 2019 05:27:50 -0000      1.227
+++ rde.h       30 Dec 2019 14:01:00 -0000
@@ -76,10 +76,12 @@ LIST_HEAD(attr_list, attr);
 LIST_HEAD(aspath_head, rde_aspath);
 RB_HEAD(prefix_tree, prefix);
 RB_HEAD(prefix_index, prefix);
+struct iq;
 
 struct rde_peer {
        LIST_ENTRY(rde_peer)             hash_l; /* hash list over all peers */
        LIST_ENTRY(rde_peer)             peer_l; /* list of all peers */
+       SIMPLEQ_HEAD(, iq)               imsg_queue;
        struct peer_config               conf;
        struct bgpd_addr                 remote_addr;
        struct bgpd_addr                 local_v4_addr;
@@ -368,6 +370,13 @@ u_int32_t  rde_local_as(void);
 int            rde_decisionflags(void);
 int            rde_as4byte(struct rde_peer *);
 struct rde_peer        *peer_get(u_int32_t);
+
+/* rde_peer.c */
+void            peer_imsg_push(struct rde_peer *, struct imsg *);
+int             peer_imsg_pop(struct rde_peer *, struct imsg *);
+void            peer_imsg_queued(struct rde_peer *, void *);
+void            peer_imsg_flush(struct rde_peer *);
+
 
 /* rde_attr.c */
 int             attr_write(void *, u_int16_t, u_int8_t, u_int8_t, void *,
Index: rde_peer.c
===================================================================
RCS file: rde_peer.c
diff -N rde_peer.c
--- /dev/null   1 Jan 1970 00:00:00 -0000
+++ rde_peer.c  30 Dec 2019 14:03:23 -0000
@@ -0,0 +1,100 @@
+/*     $OpenBSD$ */
+
+/*
+ * Copyright (c) 2019 Claudio Jeker <[email protected]>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+#include <sys/types.h>
+#include <sys/queue.h>
+
+#include <netinet/in.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "bgpd.h"
+#include "rde.h"
+
+struct iq {
+       SIMPLEQ_ENTRY(iq)       entry;
+       struct imsg             imsg;
+};
+
+/*
+ * move an imsg from src to dst, disconnecting any dynamic memory from src.
+ */
+static void
+imsg_move(struct imsg *dst, struct imsg *src)
+{
+       *dst = *src;
+       src->data = NULL;       /* allocation was moved */
+}
+
+/*
+ * push an imsg onto the peer imsg queue.
+ */
+void
+peer_imsg_push(struct rde_peer *peer, struct imsg *imsg)
+{
+       struct iq *iq;
+
+       if ((iq = calloc(1, sizeof(*iq))) == NULL)
+               fatal(NULL);
+       imsg_move(&iq->imsg, imsg);
+       SIMPLEQ_INSERT_TAIL(&peer->imsg_queue, iq, entry);
+}
+
+/*
+ * pop first imsg from peer imsg queue and move it into imsg argument.
+ * Returns 1 if an element is returned else 0.
+ */
+int
+peer_imsg_pop(struct rde_peer *peer, struct imsg *imsg)
+{
+       struct iq *iq;
+
+       iq = SIMPLEQ_FIRST(&peer->imsg_queue);
+       if (iq == NULL)
+               return 0;
+
+       imsg_move(imsg, &iq->imsg);
+
+       SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
+       free(iq);
+
+       return 1;
+}
+
+void
+peer_imsg_queued(struct rde_peer *peer, void *arg)
+{
+       int *p = arg;
+
+       *p = *p || !SIMPLEQ_EMPTY(&peer->imsg_queue);
+}
+
+/*
+ * flush all imsg queued for a peer.
+ */
+void
+peer_imsg_flush(struct rde_peer *peer)
+{
+       struct iq *iq;
+
+       while ((iq = SIMPLEQ_FIRST(&peer->imsg_queue)) != NULL) {
+               SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
+               free(iq);
+       }
+}

Reply via email to