The imsg processing in the RDE is sometimes a bit unfair. The problem is
that peers sending many UPDATES starve out the others especially on
intial table dumps. This comes from the fact that imsg are processed to
completion and so the system sending more just gets more CPU.
This diff changes this and moves peer specific imsgs to a per peer imsg
queue where messages are enqueued and later dequeued one at a time.
This results in a quicker main poll loop since less work is done per read
operation. Also on startup peers finish their initial sync relative to
their table size. Additionally a big peer flapping will have less
influence at the processing speed of other updates or withdraws.
This can be further optimized but lets keep the diffs small for a change
:)
--
:wq Claudio
Index: Makefile
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/Makefile,v
retrieving revision 1.35
diff -u -p -r1.35 Makefile
--- Makefile 17 Jun 2019 11:02:19 -0000 1.35
+++ Makefile 30 Dec 2019 12:30:18 -0000
@@ -5,7 +5,7 @@ SRCS= bgpd.c session.c log.c logmsg.c pa
rde.c rde_rib.c rde_decide.c rde_prefix.c mrt.c kroute.c control.c \
pfkey.c rde_update.c rde_attr.c rde_community.c printconf.c \
rde_filter.c rde_sets.c rde_trie.c pftable.c name2id.c \
- util.c carp.c timer.c
+ util.c carp.c timer.c rde_peer.c
CFLAGS+= -Wall -I${.CURDIR}
CFLAGS+= -Wstrict-prototypes -Wmissing-prototypes
CFLAGS+= -Wmissing-declarations
Index: rde.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
retrieving revision 1.493
diff -u -p -r1.493 rde.c
--- rde.c 16 Dec 2019 10:35:02 -0000 1.493
+++ rde.c 30 Dec 2019 14:01:58 -0000
@@ -49,6 +49,7 @@
void rde_sighdlr(int);
void rde_dispatch_imsg_session(struct imsgbuf *);
void rde_dispatch_imsg_parent(struct imsgbuf *);
+void rde_dispatch_imsg_peer(struct rde_peer *, void *);
void rde_update_dispatch(struct rde_peer *, struct imsg *);
int rde_update_update(struct rde_peer *, struct filterstate *,
struct bgpd_addr *, u_int8_t);
@@ -95,6 +96,8 @@ u_int8_t rde_roa_validity(struct rde_pr
void peer_init(u_int32_t);
void peer_shutdown(void);
+void peer_foreach(void (*)(struct rde_peer *, void *), void *);
+int peer_imsg_pending(void);
int peer_localaddrs(struct rde_peer *, struct bgpd_addr *);
struct rde_peer *peer_match(struct ctl_neighbor *, u_int32_t);
struct rde_peer *peer_add(u_int32_t, struct peer_config *);
@@ -266,7 +269,7 @@ rde_main(int debug, int verbose)
}
if (rib_dump_pending() || rde_update_queue_pending() ||
- nexthop_pending())
+ nexthop_pending() || peer_imsg_pending())
timeout = 0;
if (poll(pfd, i, timeout) == -1) {
@@ -305,6 +308,7 @@ rde_main(int debug, int verbose)
mctx = LIST_NEXT(mctx, entry);
}
+ peer_foreach(rde_dispatch_imsg_peer, NULL);
rib_dump_runner();
nexthop_runner();
if (ibuf_se && ibuf_se->w.queued < SESS_MSG_HIGH_MARK) {
@@ -358,7 +362,6 @@ rde_dispatch_imsg_session(struct imsgbuf
struct imsg imsg;
struct peer p;
struct peer_config pconf;
- struct session_up sup;
struct ctl_show_rib csr;
struct ctl_show_rib_request req;
struct rde_peer *peer;
@@ -370,7 +373,6 @@ rde_dispatch_imsg_session(struct imsgbuf
size_t aslen;
int verbose;
u_int16_t len;
- u_int8_t aid;
while (ibuf) {
if ((n = imsg_get(ibuf, &imsg)) == -1)
@@ -380,106 +382,24 @@ rde_dispatch_imsg_session(struct imsgbuf
switch (imsg.hdr.type) {
case IMSG_UPDATE:
- if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
- log_warnx("rde_dispatch: unknown peer id %d",
- imsg.hdr.peerid);
- break;
- }
- rde_update_dispatch(peer, &imsg);
- break;
- case IMSG_SESSION_ADD:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
- fatalx("incorrect size of session request");
- memcpy(&pconf, imsg.data, sizeof(pconf));
- peer_add(imsg.hdr.peerid, &pconf);
- break;
case IMSG_SESSION_UP:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
- fatalx("incorrect size of session request");
- memcpy(&sup, imsg.data, sizeof(sup));
- if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
- log_warnx("rde_dispatch: unknown peer id %d",
- imsg.hdr.peerid);
- break;
- }
- peer_up(peer, &sup);
- break;
case IMSG_SESSION_DOWN:
- if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
- log_warnx("rde_dispatch: unknown peer id %d",
- imsg.hdr.peerid);
- break;
- }
- peer_down(peer);
- break;
case IMSG_SESSION_STALE:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
- log_warnx("rde_dispatch: wrong imsg len");
- break;
- }
- memcpy(&aid, imsg.data, sizeof(aid));
- if (aid >= AID_MAX) {
- log_warnx("IMSG_SESSION_STALE: bad AID");
- break;
- }
- if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
- log_warnx("rde_dispatch: unknown peer id %d",
- imsg.hdr.peerid);
- break;
- }
- peer_stale(peer, aid);
- break;
case IMSG_SESSION_FLUSH:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
- log_warnx("rde_dispatch: wrong imsg len");
- break;
- }
- memcpy(&aid, imsg.data, sizeof(aid));
- if (aid >= AID_MAX) {
- log_warnx("IMSG_SESSION_FLUSH: bad AID");
- break;
- }
- if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
- log_warnx("rde_dispatch: unknown peer id %d",
- imsg.hdr.peerid);
- break;
- }
- peer_flush(peer, aid, peer->staletime[aid]);
- break;
case IMSG_SESSION_RESTARTED:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
- log_warnx("rde_dispatch: wrong imsg len");
- break;
- }
- memcpy(&aid, imsg.data, sizeof(aid));
- if (aid >= AID_MAX) {
- log_warnx("IMSG_SESSION_RESTARTED: bad AID");
- break;
- }
- if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
- log_warnx("rde_dispatch: unknown peer id %d",
- imsg.hdr.peerid);
- break;
- }
- if (peer->staletime[aid])
- peer_flush(peer, aid, peer->staletime[aid]);
- break;
case IMSG_REFRESH:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
- log_warnx("rde_dispatch: wrong imsg len");
- break;
- }
- memcpy(&aid, imsg.data, sizeof(aid));
- if (aid >= AID_MAX) {
- log_warnx("IMSG_REFRESH: bad AID");
- break;
- }
if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
log_warnx("rde_dispatch: unknown peer id %d",
imsg.hdr.peerid);
break;
}
- peer_dump(peer, aid);
+ peer_imsg_push(peer, &imsg);
+ break;
+ case IMSG_SESSION_ADD:
+ if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
+ fatalx("incorrect size of session request");
+ memcpy(&pconf, imsg.data, sizeof(pconf));
+ peer_add(imsg.hdr.peerid, &pconf);
break;
case IMSG_NETWORK_ADD:
if (imsg.hdr.len - IMSG_HEADER_SIZE !=
@@ -1054,6 +974,68 @@ rde_dispatch_imsg_parent(struct imsgbuf
}
}
+void
+rde_dispatch_imsg_peer(struct rde_peer *peer, void *bula)
+{
+ struct session_up sup;
+ struct imsg imsg;
+ u_int8_t aid;
+
+ if (!peer_imsg_pop(peer, &imsg))
+ return;
+
+ switch (imsg.hdr.type) {
+ case IMSG_UPDATE:
+ rde_update_dispatch(peer, &imsg);
+ break;
+ case IMSG_SESSION_UP:
+ if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
+ fatalx("incorrect size of session request");
+ memcpy(&sup, imsg.data, sizeof(sup));
+ peer_up(peer, &sup);
+ break;
+ case IMSG_SESSION_DOWN:
+ peer_down(peer);
+ break;
+ case IMSG_SESSION_STALE:
+ case IMSG_SESSION_FLUSH:
+ case IMSG_SESSION_RESTARTED:
+ case IMSG_REFRESH:
+ if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
+ log_warnx("%s: wrong imsg len", __func__);
+ break;
+ }
+ memcpy(&aid, imsg.data, sizeof(aid));
+ if (aid >= AID_MAX) {
+ log_warnx("%s: bad AID", __func__);
+ break;
+ }
+
+ switch (imsg.hdr.type) {
+ case IMSG_SESSION_STALE:
+ peer_stale(peer, aid);
+ break;
+ case IMSG_SESSION_FLUSH:
+ peer_flush(peer, aid, peer->staletime[aid]);
+ break;
+ case IMSG_SESSION_RESTARTED:
+ if (peer->staletime[aid])
+ peer_flush(peer, aid, peer->staletime[aid]);
+ break;
+ case IMSG_REFRESH:
+ peer_dump(peer, aid);
+ break;
+ }
+ break;
+ default:
+ log_warnx("%s: unhandled imsg type %d", __func__,
+ imsg.hdr.type);
+ break;
+ }
+
+ imsg_free(&imsg);
+}
+
/* handle routing updates from the session engine. */
void
rde_update_dispatch(struct rde_peer *peer, struct imsg *imsg)
@@ -3585,6 +3567,30 @@ peer_shutdown(void)
free(peertable.peer_hashtbl);
}
+/*
+ * Traverse all peers calling callback for each peer.
+ */
+void
+peer_foreach(void (*callback)(struct rde_peer *, void *), void *arg)
+{
+ struct rde_peer *peer, *np;
+ u_int32_t i;
+
+ for (i = 0; i <= peertable.peer_hashmask; i++)
+ LIST_FOREACH_SAFE(peer, &peertable.peer_hashtbl[i], hash_l, np)
+ callback(peer, arg);
+}
+
+int
+peer_imsg_pending(void)
+{
+ int pending = 0;
+
+ peer_foreach(peer_imsg_queued, &pending);
+
+ return pending;
+}
+
struct rde_peer *
peer_get(u_int32_t id)
{
@@ -3650,6 +3656,7 @@ peer_add(u_int32_t id, struct peer_confi
if (peer->loc_rib_id == RIB_NOTFOUND)
fatalx("King Bula's new peer met an unknown RIB");
peer->state = PEER_NONE;
+ SIMPLEQ_INIT(&peer->imsg_queue);
head = PEER_HASH(id);
@@ -3786,8 +3793,9 @@ peer_down(struct rde_peer *peer)
fatal("%s: prefix_dump_new", __func__);
peer_flush(peer, AID_UNSPEC, 0);
-
peer->prefix_cnt = 0;
+
+ peer_imsg_flush(peer);
LIST_REMOVE(peer, hash_l);
LIST_REMOVE(peer, peer_l);
Index: rde.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
retrieving revision 1.227
diff -u -p -r1.227 rde.h
--- rde.h 30 Oct 2019 05:27:50 -0000 1.227
+++ rde.h 30 Dec 2019 14:01:00 -0000
@@ -76,10 +76,12 @@ LIST_HEAD(attr_list, attr);
LIST_HEAD(aspath_head, rde_aspath);
RB_HEAD(prefix_tree, prefix);
RB_HEAD(prefix_index, prefix);
+struct iq;
struct rde_peer {
LIST_ENTRY(rde_peer) hash_l; /* hash list over all peers */
LIST_ENTRY(rde_peer) peer_l; /* list of all peers */
+ SIMPLEQ_HEAD(, iq) imsg_queue;
struct peer_config conf;
struct bgpd_addr remote_addr;
struct bgpd_addr local_v4_addr;
@@ -368,6 +370,13 @@ u_int32_t rde_local_as(void);
int rde_decisionflags(void);
int rde_as4byte(struct rde_peer *);
struct rde_peer *peer_get(u_int32_t);
+
+/* rde_peer.c */
+void peer_imsg_push(struct rde_peer *, struct imsg *);
+int peer_imsg_pop(struct rde_peer *, struct imsg *);
+void peer_imsg_queued(struct rde_peer *, void *);
+void peer_imsg_flush(struct rde_peer *);
+
/* rde_attr.c */
int attr_write(void *, u_int16_t, u_int8_t, u_int8_t, void *,
Index: rde_peer.c
===================================================================
RCS file: rde_peer.c
diff -N rde_peer.c
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ rde_peer.c 30 Dec 2019 14:03:23 -0000
@@ -0,0 +1,100 @@
+/* $OpenBSD$ */
+
+/*
+ * Copyright (c) 2019 Claudio Jeker <[email protected]>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+#include <sys/types.h>
+#include <sys/queue.h>
+
+#include <netinet/in.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "bgpd.h"
+#include "rde.h"
+
+struct iq {
+ SIMPLEQ_ENTRY(iq) entry;
+ struct imsg imsg;
+};
+
+/*
+ * move an imsg from src to dst, disconnecting any dynamic memory from src.
+ */
+static void
+imsg_move(struct imsg *dst, struct imsg *src)
+{
+ *dst = *src;
+ src->data = NULL; /* allocation was moved */
+}
+
+/*
+ * push an imsg onto the peer imsg queue.
+ */
+void
+peer_imsg_push(struct rde_peer *peer, struct imsg *imsg)
+{
+ struct iq *iq;
+
+ if ((iq = calloc(1, sizeof(*iq))) == NULL)
+ fatal(NULL);
+ imsg_move(&iq->imsg, imsg);
+ SIMPLEQ_INSERT_TAIL(&peer->imsg_queue, iq, entry);
+}
+
+/*
+ * pop first imsg from peer imsg queue and move it into imsg argument.
+ * Returns 1 if an element is returned else 0.
+ */
+int
+peer_imsg_pop(struct rde_peer *peer, struct imsg *imsg)
+{
+ struct iq *iq;
+
+ iq = SIMPLEQ_FIRST(&peer->imsg_queue);
+ if (iq == NULL)
+ return 0;
+
+ imsg_move(imsg, &iq->imsg);
+
+ SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
+ free(iq);
+
+ return 1;
+}
+
+void
+peer_imsg_queued(struct rde_peer *peer, void *arg)
+{
+ int *p = arg;
+
+ *p = *p || !SIMPLEQ_EMPTY(&peer->imsg_queue);
+}
+
+/*
+ * flush all imsg queued for a peer.
+ */
+void
+peer_imsg_flush(struct rde_peer *peer)
+{
+ struct iq *iq;
+
+ while ((iq = SIMPLEQ_FIRST(&peer->imsg_queue)) != NULL) {
+ SIMPLEQ_REMOVE_HEAD(&peer->imsg_queue, entry);
+ free(iq);
+ }
+}