Noticed on a route collector with >100 full feeds and well 80Mio prefixes.
On startup the RDE slurps in a lot of messages and then slowly processes
them. Those are mostly IMSG_UDPATE but the current code also queues
IMSG_SESSION_DOWN, IMSG_SESSION_UP and the graceful restart imsgs.
It does not queue IMSG_SESSION_ADD and this is a problem.
If the queue of a neighbor is long then an IMSG_SESSION_DOWN can be
delayed after processing the IMSG_SESSION_ADD which is sent when the
session is reestablished. The result is that a peer is removed in the
RDE that is actually active. Most noticable is that the log is filled
with "rde_dispatch: unknown peer id XYZ".
To solve this issue IMSG_SESSION_DOWN needs to be called immeditatly.
For graceful restart the same is required. In the graceful restart case
two messages indicate a session reset (IMSG_SESSION_STALE and the new
IMSG_SESSION_NOGRACE). Depending on the AFI/SAFI settings in the GR
capability either one or the other is sent when the session is reset.
For the above 3 messages the RDE should stop all rib dump runners and more
importantly flush the peer imsg queue. All those queued messages no longer
matter.
IMSG_SESSION_UP could still be handled through the queue since it is the
first message sent after IMSG_SESSION_ADD. But I decided to also handle it
immediatly.
The graceful restart imsgs IMSG_SESSION_FLUSH and IMSG_SESSION_RESTARTED
are there to clean up stale routes (either after a timeout or a successful
restart). Again it is best to handle those messages immediately and clean
up the RIB (even though a backlog of UPDATES may be present).
In this case the imsg queue should not be flushed since there may be an
active connection. Note: IMSG_SESSION_RESTARTED is sent based on an EoR
marker from an IMSG_UPDATE (so it is delayed and in order because of that).
With this only IMSG_UPDATE and IMSG_REFRESH are queued.
I'm running this diff on the route collector and some test systems and it
seems to solve the issue.
--
:wq Claudio
Index: bgpd.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v
retrieving revision 1.449
diff -u -p -r1.449 bgpd.h
--- bgpd.h 10 Aug 2022 14:17:01 -0000 1.449
+++ bgpd.h 26 Aug 2022 08:24:49 -0000
@@ -592,6 +592,7 @@ enum imsg_type {
IMSG_SESSION_UP,
IMSG_SESSION_DOWN,
IMSG_SESSION_STALE,
+ IMSG_SESSION_NOGRACE,
IMSG_SESSION_FLUSH,
IMSG_SESSION_RESTARTED,
IMSG_SESSION_DEPENDON,
Index: rde.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
retrieving revision 1.564
diff -u -p -r1.564 rde.c
--- rde.c 17 Aug 2022 15:15:26 -0000 1.564
+++ rde.c 26 Aug 2022 08:24:49 -0000
@@ -358,10 +358,10 @@ rde_dispatch_imsg_session(struct imsgbuf
{
struct imsg imsg;
struct peer p;
- struct peer_config pconf;
struct ctl_show_set cset;
struct ctl_show_rib csr;
struct ctl_show_rib_request req;
+ struct session_up sup;
struct rde_peer *peer;
struct rde_aspath *asp;
struct rde_hashstats rdehash;
@@ -373,6 +373,7 @@ rde_dispatch_imsg_session(struct imsgbuf
size_t aslen;
int verbose;
uint16_t len;
+ uint8_t aid;
while (ibuf) {
if ((n = imsg_get(ibuf, &imsg)) == -1)
@@ -382,11 +383,6 @@ rde_dispatch_imsg_session(struct imsgbuf
switch (imsg.hdr.type) {
case IMSG_UPDATE:
- case IMSG_SESSION_UP:
- case IMSG_SESSION_DOWN:
- case IMSG_SESSION_STALE:
- case IMSG_SESSION_FLUSH:
- case IMSG_SESSION_RESTARTED:
case IMSG_REFRESH:
if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
log_warnx("rde_dispatch: unknown peer id %d",
@@ -396,14 +392,71 @@ rde_dispatch_imsg_session(struct imsgbuf
peer_imsg_push(peer, &imsg);
break;
case IMSG_SESSION_ADD:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
+ if (imsg.hdr.len - IMSG_HEADER_SIZE !=
+ sizeof(struct peer_config))
+ fatalx("incorrect size of session request");
+ peer = peer_add(imsg.hdr.peerid, imsg.data);
+ /* make sure rde_eval_all is on if needed. */
+ if (peer->conf.flags & PEERFLAG_EVALUATE_ALL)
+ rde_eval_all = 1;
+ break;
+ case IMSG_SESSION_UP:
+ if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
+ log_warnx("%s: unknown peer id %d",
+ "IMSG_SESSION_UP", imsg.hdr.peerid);
+ break;
+ }
+ if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
fatalx("incorrect size of session request");
- memcpy(&pconf, imsg.data, sizeof(pconf));
- peer_add(imsg.hdr.peerid, &pconf);
+ memcpy(&sup, imsg.data, sizeof(sup));
+ peer_up(peer, &sup);
/* make sure rde_eval_all is on if needed. */
- if (pconf.flags & PEERFLAG_EVALUATE_ALL)
+ if (peer_has_add_path(peer, AID_UNSPEC, CAPA_AP_SEND))
rde_eval_all = 1;
break;
+ case IMSG_SESSION_DOWN:
+ if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
+ log_warnx("%s: unknown peer id %d",
+ "IMSG_SESSION_DOWN", imsg.hdr.peerid);
+ break;
+ }
+ peer_down(peer, NULL);
+ break;
+ case IMSG_SESSION_STALE:
+ case IMSG_SESSION_NOGRACE:
+ case IMSG_SESSION_FLUSH:
+ case IMSG_SESSION_RESTARTED:
+ if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
+ log_warnx("%s: unknown peer id %d",
+ "graceful restart", imsg.hdr.peerid);
+ break;
+ }
+ if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
+ log_warnx("%s: wrong imsg len", __func__);
+ break;
+ }
+ memcpy(&aid, imsg.data, sizeof(aid));
+ if (aid >= AID_MAX) {
+ log_warnx("%s: bad AID", __func__);
+ break;
+ }
+
+ switch (imsg.hdr.type) {
+ case IMSG_SESSION_STALE:
+ case IMSG_SESSION_NOGRACE:
+ peer_stale(peer, aid,
+ imsg.hdr.type == IMSG_SESSION_NOGRACE);
+ break;
+ case IMSG_SESSION_FLUSH:
+ peer_flush(peer, aid, peer->staletime[aid]);
+ break;
+ case IMSG_SESSION_RESTARTED:
+ if (peer->staletime[aid])
+ peer_flush(peer, aid,
+ peer->staletime[aid]);
+ break;
+ }
+ break;
case IMSG_NETWORK_ADD:
if (imsg.hdr.len - IMSG_HEADER_SIZE !=
sizeof(struct network_config)) {
@@ -1069,9 +1122,7 @@ void
rde_dispatch_imsg_peer(struct rde_peer *peer, void *bula)
{
struct route_refresh rr;
- struct session_up sup;
struct imsg imsg;
- uint8_t aid;
if (!peer_imsg_pop(peer, &imsg))
return;
@@ -1081,48 +1132,6 @@ rde_dispatch_imsg_peer(struct rde_peer *
if (peer->state != PEER_UP)
break;
rde_update_dispatch(peer, &imsg);
- break;
- case IMSG_SESSION_UP:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
- fatalx("incorrect size of session request");
- memcpy(&sup, imsg.data, sizeof(sup));
- if (peer_up(peer, &sup) == -1) {
- peer->state = PEER_DOWN;
- imsg_compose(ibuf_se, IMSG_SESSION_DOWN, peer->conf.id,
- 0, -1, NULL, 0);
- }
- /* make sure rde_eval_all is on if needed. */
- if (peer_has_add_path(peer, AID_UNSPEC, CAPA_AP_SEND))
- rde_eval_all = 1;
- break;
- case IMSG_SESSION_DOWN:
- peer_down(peer, NULL);
- break;
- case IMSG_SESSION_STALE:
- case IMSG_SESSION_FLUSH:
- case IMSG_SESSION_RESTARTED:
- if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
- log_warnx("%s: wrong imsg len", __func__);
- break;
- }
- memcpy(&aid, imsg.data, sizeof(aid));
- if (aid >= AID_MAX) {
- log_warnx("%s: bad AID", __func__);
- break;
- }
-
- switch (imsg.hdr.type) {
- case IMSG_SESSION_STALE:
- peer_stale(peer, aid);
- break;
- case IMSG_SESSION_FLUSH:
- peer_flush(peer, aid, peer->staletime[aid]);
- break;
- case IMSG_SESSION_RESTARTED:
- if (peer->staletime[aid])
- peer_flush(peer, aid, peer->staletime[aid]);
- break;
- }
break;
case IMSG_REFRESH:
if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(rr)) {
Index: rde.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
retrieving revision 1.262
diff -u -p -r1.262 rde.h
--- rde.h 3 Aug 2022 08:56:23 -0000 1.262
+++ rde.h 26 Aug 2022 08:24:49 -0000
@@ -418,10 +418,10 @@ struct rde_peer *peer_get(uint32_t);
struct rde_peer *peer_match(struct ctl_neighbor *, uint32_t);
struct rde_peer *peer_add(uint32_t, struct peer_config *);
-int peer_up(struct rde_peer *, struct session_up *);
+void peer_up(struct rde_peer *, struct session_up *);
void peer_down(struct rde_peer *, void *);
void peer_flush(struct rde_peer *, uint8_t, time_t);
-void peer_stale(struct rde_peer *, uint8_t);
+void peer_stale(struct rde_peer *, uint8_t, int);
void peer_dump(struct rde_peer *, uint8_t);
void peer_begin_rrefresh(struct rde_peer *, uint8_t);
Index: rde_peer.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde_peer.c,v
retrieving revision 1.21
diff -u -p -r1.21 rde_peer.c
--- rde_peer.c 17 Aug 2022 15:15:26 -0000 1.21
+++ rde_peer.c 26 Aug 2022 08:24:49 -0000
@@ -194,7 +194,7 @@ peer_add(uint32_t id, struct peer_config
if ((peer = peer_get(id))) {
memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
- return (NULL);
+ return (peer);
}
peer = calloc(1, sizeof(struct rde_peer));
@@ -408,7 +408,7 @@ rde_up_dump_done(void *ptr, uint8_t aid)
/*
* Session got established, bring peer up, load RIBs do initial table dump.
*/
-int
+void
peer_up(struct rde_peer *peer, struct session_up *sup)
{
uint8_t i;
@@ -418,6 +418,8 @@ peer_up(struct rde_peer *peer, struct se
* There is a race condition when doing PEER_ERR -> PEER_DOWN.
* So just do a full reset of the peer here.
*/
+ rib_dump_terminate(peer);
+ peer_imsg_flush(peer);
if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
peer_adjout_clear_upcall, NULL, NULL) == -1)
fatal("%s: prefix_dump_new", __func__);
@@ -448,8 +450,6 @@ peer_up(struct rde_peer *peer, struct se
if (peer->capa.mp[i])
peer_dump(peer, i);
}
-
- return (0);
}
/*
@@ -461,8 +461,12 @@ peer_down(struct rde_peer *peer, void *b
{
peer->remote_bgpid = 0;
peer->state = PEER_DOWN;
- /* stop all pending dumps which may depend on this peer */
+ /*
+ * stop all pending dumps which may depend on this peer
+ * and flush all pending imsg from the SE.
+ */
rib_dump_terminate(peer);
+ peer_imsg_flush(peer);
/* flush Adj-RIB-Out */
if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
@@ -474,8 +478,6 @@ peer_down(struct rde_peer *peer, void *b
peer->prefix_cnt = 0;
peer->prefix_out_cnt = 0;
- peer_imsg_flush(peer);
-
LIST_REMOVE(peer, hash_l);
LIST_REMOVE(peer, peer_l);
free(peer);
@@ -511,7 +513,7 @@ peer_flush(struct rde_peer *peer, uint8_
* is set to the current timestamp for identifying stale routes in Adj-RIB-In.
*/
void
-peer_stale(struct rde_peer *peer, uint8_t aid)
+peer_stale(struct rde_peer *peer, uint8_t aid, int flushall)
{
time_t now;
@@ -522,9 +524,19 @@ peer_stale(struct rde_peer *peer, uint8_
peer->staletime[aid] = now = getmonotime();
peer->state = PEER_DOWN;
+ /*
+ * stop all pending dumps which may depend on this peer
+ * and flush all pending imsg from the SE.
+ */
+ rib_dump_terminate(peer);
+ peer_imsg_flush(peer);
+
+ if (flushall)
+ peer_flush(peer, aid, 0);
+
/* XXX this is not quite correct */
/* mark Adj-RIB-Out stale for this peer */
- if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
+ if (prefix_dump_new(peer, aid, 0, NULL,
peer_adjout_stale_upcall, NULL, NULL) == -1)
fatal("%s: prefix_dump_new", __func__);
Index: session.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/session.c,v
retrieving revision 1.433
diff -u -p -r1.433 session.c
--- session.c 17 Aug 2022 15:15:26 -0000 1.433
+++ session.c 26 Aug 2022 08:24:49 -0000
@@ -1760,7 +1760,7 @@ session_graceful_restart(struct peer *p)
aid2str(i));
p->capa.neg.grestart.flags[i] |= CAPA_GR_RESTARTING;
} else if (p->capa.neg.mp[i]) {
- if (imsg_rde(IMSG_SESSION_FLUSH, p->conf.id,
+ if (imsg_rde(IMSG_SESSION_NOGRACE, p->conf.id,
&i, sizeof(i)) == -1)
return (-1);
log_peer_warnx(&p->conf,