Noticed on a route collector with >100 full feeds and well 80Mio prefixes.
On startup the RDE slurps in a lot of messages and then slowly processes
them. Those are mostly IMSG_UDPATE but the current code also queues
IMSG_SESSION_DOWN, IMSG_SESSION_UP and the graceful restart imsgs.
It does not queue IMSG_SESSION_ADD and this is a problem.

If the queue of a neighbor is long then an IMSG_SESSION_DOWN can be
delayed after processing the IMSG_SESSION_ADD which is sent when the
session is reestablished. The result is that a peer is removed in the
RDE that is actually active. Most noticable is that the log is filled
with "rde_dispatch: unknown peer id XYZ".

To solve this issue IMSG_SESSION_DOWN needs to be called immeditatly.
For graceful restart the same is required. In the graceful restart case
two messages indicate a session reset (IMSG_SESSION_STALE and the new
IMSG_SESSION_NOGRACE). Depending on the AFI/SAFI settings in the GR
capability either one or the other is sent when the session is reset.

For the above 3 messages the RDE should stop all rib dump runners and more
importantly flush the peer imsg queue. All those queued messages no longer
matter.

IMSG_SESSION_UP could still be handled through the queue since it is the
first message sent after IMSG_SESSION_ADD. But I decided to also handle it
immediatly.

The graceful restart imsgs IMSG_SESSION_FLUSH and IMSG_SESSION_RESTARTED
are there to clean up stale routes (either after a timeout or a successful
restart). Again it is best to handle those messages immediately and clean
up the RIB (even though a backlog of UPDATES may be present).
In this case the imsg queue should not be flushed since there may be an
active connection. Note: IMSG_SESSION_RESTARTED is sent based on an EoR
marker from an IMSG_UPDATE (so it is delayed and in order because of that).

With this only IMSG_UPDATE and IMSG_REFRESH are queued.

I'm running this diff on the route collector and some test systems and it
seems to solve the issue.
-- 
:wq Claudio

Index: bgpd.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v
retrieving revision 1.449
diff -u -p -r1.449 bgpd.h
--- bgpd.h      10 Aug 2022 14:17:01 -0000      1.449
+++ bgpd.h      26 Aug 2022 08:24:49 -0000
@@ -592,6 +592,7 @@ enum imsg_type {
        IMSG_SESSION_UP,
        IMSG_SESSION_DOWN,
        IMSG_SESSION_STALE,
+       IMSG_SESSION_NOGRACE,
        IMSG_SESSION_FLUSH,
        IMSG_SESSION_RESTARTED,
        IMSG_SESSION_DEPENDON,
Index: rde.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
retrieving revision 1.564
diff -u -p -r1.564 rde.c
--- rde.c       17 Aug 2022 15:15:26 -0000      1.564
+++ rde.c       26 Aug 2022 08:24:49 -0000
@@ -358,10 +358,10 @@ rde_dispatch_imsg_session(struct imsgbuf
 {
        struct imsg              imsg;
        struct peer              p;
-       struct peer_config       pconf;
        struct ctl_show_set      cset;
        struct ctl_show_rib      csr;
        struct ctl_show_rib_request     req;
+       struct session_up        sup;
        struct rde_peer         *peer;
        struct rde_aspath       *asp;
        struct rde_hashstats     rdehash;
@@ -373,6 +373,7 @@ rde_dispatch_imsg_session(struct imsgbuf
        size_t                   aslen;
        int                      verbose;
        uint16_t                 len;
+       uint8_t                  aid;
 
        while (ibuf) {
                if ((n = imsg_get(ibuf, &imsg)) == -1)
@@ -382,11 +383,6 @@ rde_dispatch_imsg_session(struct imsgbuf
 
                switch (imsg.hdr.type) {
                case IMSG_UPDATE:
-               case IMSG_SESSION_UP:
-               case IMSG_SESSION_DOWN:
-               case IMSG_SESSION_STALE:
-               case IMSG_SESSION_FLUSH:
-               case IMSG_SESSION_RESTARTED:
                case IMSG_REFRESH:
                        if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
                                log_warnx("rde_dispatch: unknown peer id %d",
@@ -396,14 +392,71 @@ rde_dispatch_imsg_session(struct imsgbuf
                        peer_imsg_push(peer, &imsg);
                        break;
                case IMSG_SESSION_ADD:
-                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(pconf))
+                       if (imsg.hdr.len - IMSG_HEADER_SIZE !=
+                           sizeof(struct peer_config))
+                               fatalx("incorrect size of session request");
+                       peer = peer_add(imsg.hdr.peerid, imsg.data);
+                       /* make sure rde_eval_all is on if needed. */
+                       if (peer->conf.flags & PEERFLAG_EVALUATE_ALL)
+                               rde_eval_all = 1;
+                       break;
+               case IMSG_SESSION_UP:
+                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
+                               log_warnx("%s: unknown peer id %d",
+                                   "IMSG_SESSION_UP", imsg.hdr.peerid);
+                               break;
+                       }
+                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
                                fatalx("incorrect size of session request");
-                       memcpy(&pconf, imsg.data, sizeof(pconf));
-                       peer_add(imsg.hdr.peerid, &pconf);
+                       memcpy(&sup, imsg.data, sizeof(sup));
+                       peer_up(peer, &sup);
                        /* make sure rde_eval_all is on if needed. */
-                       if (pconf.flags & PEERFLAG_EVALUATE_ALL)
+                       if (peer_has_add_path(peer, AID_UNSPEC, CAPA_AP_SEND))
                                rde_eval_all = 1;
                        break;
+               case IMSG_SESSION_DOWN:
+                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
+                               log_warnx("%s: unknown peer id %d",
+                                   "IMSG_SESSION_DOWN", imsg.hdr.peerid);
+                               break;
+                       }
+                       peer_down(peer, NULL);
+                       break;
+               case IMSG_SESSION_STALE:
+               case IMSG_SESSION_NOGRACE:
+               case IMSG_SESSION_FLUSH:
+               case IMSG_SESSION_RESTARTED:
+                       if ((peer = peer_get(imsg.hdr.peerid)) == NULL) {
+                               log_warnx("%s: unknown peer id %d",
+                                   "graceful restart", imsg.hdr.peerid);
+                               break;
+                       }
+                       if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
+                               log_warnx("%s: wrong imsg len", __func__);
+                               break;
+                       }
+                       memcpy(&aid, imsg.data, sizeof(aid));
+                       if (aid >= AID_MAX) {
+                               log_warnx("%s: bad AID", __func__);
+                               break;
+                       }
+
+                       switch (imsg.hdr.type) {
+                       case IMSG_SESSION_STALE:
+                       case IMSG_SESSION_NOGRACE:
+                               peer_stale(peer, aid,
+                                   imsg.hdr.type == IMSG_SESSION_NOGRACE);
+                               break;
+                       case IMSG_SESSION_FLUSH:
+                               peer_flush(peer, aid, peer->staletime[aid]);
+                               break;
+                       case IMSG_SESSION_RESTARTED:
+                               if (peer->staletime[aid])
+                                       peer_flush(peer, aid,
+                                           peer->staletime[aid]);
+                               break;
+                       }
+                       break;
                case IMSG_NETWORK_ADD:
                        if (imsg.hdr.len - IMSG_HEADER_SIZE !=
                            sizeof(struct network_config)) {
@@ -1069,9 +1122,7 @@ void
 rde_dispatch_imsg_peer(struct rde_peer *peer, void *bula)
 {
        struct route_refresh rr;
-       struct session_up sup;
        struct imsg imsg;
-       uint8_t aid;
 
        if (!peer_imsg_pop(peer, &imsg))
                return;
@@ -1081,48 +1132,6 @@ rde_dispatch_imsg_peer(struct rde_peer *
                if (peer->state != PEER_UP)
                        break;
                rde_update_dispatch(peer, &imsg);
-               break;
-       case IMSG_SESSION_UP:
-               if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(sup))
-                       fatalx("incorrect size of session request");
-               memcpy(&sup, imsg.data, sizeof(sup));
-               if (peer_up(peer, &sup) == -1) {
-                       peer->state = PEER_DOWN;
-                       imsg_compose(ibuf_se, IMSG_SESSION_DOWN, peer->conf.id,
-                           0, -1, NULL, 0);
-               }
-               /* make sure rde_eval_all is on if needed. */
-               if (peer_has_add_path(peer, AID_UNSPEC, CAPA_AP_SEND))
-                       rde_eval_all = 1;
-               break;
-       case IMSG_SESSION_DOWN:
-               peer_down(peer, NULL);
-               break;
-       case IMSG_SESSION_STALE:
-       case IMSG_SESSION_FLUSH:
-       case IMSG_SESSION_RESTARTED:
-               if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(aid)) {
-                       log_warnx("%s: wrong imsg len", __func__);
-                       break;
-               }
-               memcpy(&aid, imsg.data, sizeof(aid));
-               if (aid >= AID_MAX) {
-                       log_warnx("%s: bad AID", __func__);
-                       break;
-               }
-
-               switch (imsg.hdr.type) {
-               case IMSG_SESSION_STALE:
-                       peer_stale(peer, aid);
-                       break;
-               case IMSG_SESSION_FLUSH:
-                       peer_flush(peer, aid, peer->staletime[aid]);
-                       break;
-               case IMSG_SESSION_RESTARTED:
-                       if (peer->staletime[aid])
-                               peer_flush(peer, aid, peer->staletime[aid]);
-                       break;
-               }
                break;
        case IMSG_REFRESH:
                if (imsg.hdr.len - IMSG_HEADER_SIZE != sizeof(rr)) {
Index: rde.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
retrieving revision 1.262
diff -u -p -r1.262 rde.h
--- rde.h       3 Aug 2022 08:56:23 -0000       1.262
+++ rde.h       26 Aug 2022 08:24:49 -0000
@@ -418,10 +418,10 @@ struct rde_peer   *peer_get(uint32_t);
 struct rde_peer *peer_match(struct ctl_neighbor *, uint32_t);
 struct rde_peer        *peer_add(uint32_t, struct peer_config *);
 
-int             peer_up(struct rde_peer *, struct session_up *);
+void            peer_up(struct rde_peer *, struct session_up *);
 void            peer_down(struct rde_peer *, void *);
 void            peer_flush(struct rde_peer *, uint8_t, time_t);
-void            peer_stale(struct rde_peer *, uint8_t);
+void            peer_stale(struct rde_peer *, uint8_t, int);
 void            peer_dump(struct rde_peer *, uint8_t);
 void            peer_begin_rrefresh(struct rde_peer *, uint8_t);
 
Index: rde_peer.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde_peer.c,v
retrieving revision 1.21
diff -u -p -r1.21 rde_peer.c
--- rde_peer.c  17 Aug 2022 15:15:26 -0000      1.21
+++ rde_peer.c  26 Aug 2022 08:24:49 -0000
@@ -194,7 +194,7 @@ peer_add(uint32_t id, struct peer_config
 
        if ((peer = peer_get(id))) {
                memcpy(&peer->conf, p_conf, sizeof(struct peer_config));
-               return (NULL);
+               return (peer);
        }
 
        peer = calloc(1, sizeof(struct rde_peer));
@@ -408,7 +408,7 @@ rde_up_dump_done(void *ptr, uint8_t aid)
 /*
  * Session got established, bring peer up, load RIBs do initial table dump.
  */
-int
+void
 peer_up(struct rde_peer *peer, struct session_up *sup)
 {
        uint8_t  i;
@@ -418,6 +418,8 @@ peer_up(struct rde_peer *peer, struct se
                 * There is a race condition when doing PEER_ERR -> PEER_DOWN.
                 * So just do a full reset of the peer here.
                 */
+               rib_dump_terminate(peer);
+               peer_imsg_flush(peer);
                if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
                    peer_adjout_clear_upcall, NULL, NULL) == -1)
                        fatal("%s: prefix_dump_new", __func__);
@@ -448,8 +450,6 @@ peer_up(struct rde_peer *peer, struct se
                if (peer->capa.mp[i])
                        peer_dump(peer, i);
        }
-
-       return (0);
 }
 
 /*
@@ -461,8 +461,12 @@ peer_down(struct rde_peer *peer, void *b
 {
        peer->remote_bgpid = 0;
        peer->state = PEER_DOWN;
-       /* stop all pending dumps which may depend on this peer */
+       /*
+        * stop all pending dumps which may depend on this peer
+        * and flush all pending imsg from the SE.
+        */
        rib_dump_terminate(peer);
+       peer_imsg_flush(peer);
 
        /* flush Adj-RIB-Out */
        if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
@@ -474,8 +478,6 @@ peer_down(struct rde_peer *peer, void *b
        peer->prefix_cnt = 0;
        peer->prefix_out_cnt = 0;
 
-       peer_imsg_flush(peer);
-
        LIST_REMOVE(peer, hash_l);
        LIST_REMOVE(peer, peer_l);
        free(peer);
@@ -511,7 +513,7 @@ peer_flush(struct rde_peer *peer, uint8_
  * is set to the current timestamp for identifying stale routes in Adj-RIB-In.
  */
 void
-peer_stale(struct rde_peer *peer, uint8_t aid)
+peer_stale(struct rde_peer *peer, uint8_t aid, int flushall)
 {
        time_t now;
 
@@ -522,9 +524,19 @@ peer_stale(struct rde_peer *peer, uint8_
        peer->staletime[aid] = now = getmonotime();
        peer->state = PEER_DOWN;
 
+       /*
+        * stop all pending dumps which may depend on this peer
+        * and flush all pending imsg from the SE.
+        */
+       rib_dump_terminate(peer);
+       peer_imsg_flush(peer);
+
+       if (flushall)
+               peer_flush(peer, aid, 0);
+
        /* XXX this is not quite correct */
        /* mark Adj-RIB-Out stale for this peer */
-       if (prefix_dump_new(peer, AID_UNSPEC, 0, NULL,
+       if (prefix_dump_new(peer, aid, 0, NULL,
            peer_adjout_stale_upcall, NULL, NULL) == -1)
                fatal("%s: prefix_dump_new", __func__);
 
Index: session.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/session.c,v
retrieving revision 1.433
diff -u -p -r1.433 session.c
--- session.c   17 Aug 2022 15:15:26 -0000      1.433
+++ session.c   26 Aug 2022 08:24:49 -0000
@@ -1760,7 +1760,7 @@ session_graceful_restart(struct peer *p)
                            aid2str(i));
                        p->capa.neg.grestart.flags[i] |= CAPA_GR_RESTARTING;
                } else if (p->capa.neg.mp[i]) {
-                       if (imsg_rde(IMSG_SESSION_FLUSH, p->conf.id,
+                       if (imsg_rde(IMSG_SESSION_NOGRACE, p->conf.id,
                            &i, sizeof(i)) == -1)
                                return (-1);
                        log_peer_warnx(&p->conf,

Reply via email to