Both bgpctl and bgp neighbors are often not fast enough to keep up with
the RDE. The result is quite a bit of memory bloat or some ugly
workarounds for bgpctl which can result in starving other bgpctl calls to
death.

This implements a simple XON / XOFF protocol for peers and control
sessions and helps reducing the pain on busy boxes. It is a first step.
There is still some major changes needed to reduce the update overhead
seen when many session start up at the same time.

I would love to hear from people with larger setups if there are any
problems.
-- 
:wq Claudio


Index: bgpd.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v
retrieving revision 1.300
diff -u -p -r1.300 bgpd.h
--- bgpd.h      25 Jan 2017 00:11:07 -0000      1.300
+++ bgpd.h      25 Jan 2017 04:22:34 -0000
@@ -87,13 +87,17 @@
 #define        F_RTLABEL               0x10000
 
 /*
- * Limit the number of control messages generated by the RDE and queued in
- * session engine. The RDE limit defines how many imsg are generated in
- * one poll round. Then if the SE limit is hit the RDE control socket will no
- * longer be polled.
+ * Limit the number of messages queued in the session engine.
+ * The SE will send an IMSG_XOFF messages to the RDE if the high water mark
+ * is reached. The RDE should then throttle this peer or control connection.
+ * Once the message queue in the SE drops below the low water mark an
+ * IMSG_XON message will be sent and the RDE will produce more messages again.
  */
 #define RDE_RUNNER_ROUNDS      100
-#define SESSION_CTL_QUEUE_MAX  10000
+#define SESS_MSG_HIGH_MARK     300
+#define SESS_MSG_LOW_MARK      50
+#define CTL_MSG_HIGH_MARK      500
+#define CTL_MSG_LOW_MARK       100
 
 enum bgpd_process {
        PROC_MAIN,
@@ -425,7 +429,9 @@ enum imsg_type {
        IMSG_PFTABLE_COMMIT,
        IMSG_REFRESH,
        IMSG_IFINFO,
-       IMSG_DEMOTE
+       IMSG_DEMOTE,
+       IMSG_XON,
+       IMSG_XOFF
 };
 
 struct demote_msg {
Index: control.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/control.c,v
retrieving revision 1.87
diff -u -p -r1.87 control.c
--- control.c   13 Feb 2017 14:48:44 -0000      1.87
+++ control.c   16 Feb 2017 19:20:23 -0000
@@ -213,11 +213,16 @@ control_dispatch_msg(struct pollfd *pfd,
                return (0);
        }
 
-       if (pfd->revents & POLLOUT)
+       if (pfd->revents & POLLOUT) {
                if (msgbuf_write(&c->ibuf.w) <= 0 && errno != EAGAIN) {
                        *ctl_cnt -= control_close(pfd->fd);
                        return (1);
                }
+               if (c->throttled && c->ibuf.w.queued < CTL_MSG_LOW_MARK) {
+                       if (imsg_ctl_rde(IMSG_XON, c->ibuf.pid, NULL, 0) != -1)
+                               c->throttled = 0;
+               }
+       }
 
        if (!(pfd->revents & POLLIN))
                return (0);
@@ -521,6 +526,11 @@ control_imsg_relay(struct imsg *imsg)
 
        if ((c = control_connbypid(imsg->hdr.pid)) == NULL)
                return (0);
+
+       if (!c->throttled && c->ibuf.w.queued > CTL_MSG_HIGH_MARK) {
+               if (imsg_ctl_rde(IMSG_XOFF, imsg->hdr.pid, NULL, 0) != -1)
+                       c->throttled = 1;
+       }
 
        return (imsg_compose(&c->ibuf, imsg->hdr.type, 0, imsg->hdr.pid, -1,
            imsg->data, imsg->hdr.len - IMSG_HEADER_SIZE));
Index: rde.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
retrieving revision 1.361
diff -u -p -r1.361 rde.c
--- rde.c       25 Jan 2017 03:21:55 -0000      1.361
+++ rde.c       26 May 2017 18:57:51 -0000
@@ -76,7 +76,7 @@ void           rde_update_log(const char *, u_in
 void            rde_as4byte_fixup(struct rde_peer *, struct rde_aspath *);
 void            rde_reflector(struct rde_peer *, struct rde_aspath *);
 
-void            rde_dump_rib_as(struct prefix *, struct rde_aspath *,pid_t,
+void            rde_dump_rib_as(struct prefix *, struct rde_aspath *, pid_t,
                     int);
 void            rde_dump_filter(struct prefix *,
                     struct ctl_show_rib_request *);
@@ -86,8 +86,14 @@ void          rde_dump_upcall(struct rib_entry 
 void            rde_dump_prefix_upcall(struct rib_entry *, void *);
 void            rde_dump_ctx_new(struct ctl_show_rib_request *, pid_t,
                     enum imsg_type);
-void            rde_dump_mrt_new(struct mrt *, pid_t, int);
+void            rde_dump_ctx_throttle(pid_t pid, int throttle);
+void            rde_dump_runner(void);
+int             rde_dump_pending(void);
 void            rde_dump_done(void *);
+void            rde_dump_mrt_new(struct mrt *, pid_t, int);
+void            rde_dump_rib_free(struct rib *);
+void            rde_dump_mrt_free(struct rib *);
+void            rde_rib_free(struct rib_desc *);
 
 int             rde_rdomain_import(struct rde_aspath *, struct rdomain *);
 void            rde_reload_done(void);
@@ -131,15 +137,19 @@ struct imsgbuf            *ibuf_main;
 struct rde_memstats     rdemem;
 
 struct rde_dump_ctx {
+       LIST_ENTRY(rde_dump_ctx)        entry;
        struct rib_context              ribctx;
        struct ctl_show_rib_request     req;
        sa_family_t                     af;
+       u_int8_t                        throttled;
 };
 
+LIST_HEAD(, rde_dump_ctx) rde_dump_h = LIST_HEAD_INITIALIZER(rde_dump_h);
+
 struct rde_mrt_ctx {
-       struct mrt              mrt;
-       struct rib_context      ribctx;
        LIST_ENTRY(rde_mrt_ctx) entry;
+       struct rib_context      ribctx;
+       struct mrt              mrt;
 };
 
 LIST_HEAD(, rde_mrt_ctx) rde_mrts = LIST_HEAD_INITIALIZER(rde_mrts);
@@ -248,7 +258,7 @@ rde_main(int debug, int verbose)
                set_pollfd(&pfd[PFD_PIPE_SESSION], ibuf_se);
                set_pollfd(&pfd[PFD_PIPE_SESSION_CTL], ibuf_se_ctl);
 
-               if (rib_dump_pending() &&
+               if (rde_dump_pending() &&
                    ibuf_se_ctl && ibuf_se_ctl->w.queued == 0)
                        timeout = 0;
 
@@ -261,7 +271,6 @@ rde_main(int debug, int verbose)
                                i++;
                        } else if (mctx->mrt.state == MRT_STATE_REMOVE) {
                                close(mctx->mrt.wbuf.fd);
-                               LIST_REMOVE(&mctx->ribctx, entry);
                                LIST_REMOVE(mctx, entry);
                                free(mctx);
                                rde_mrt_cnt--;
@@ -307,9 +316,9 @@ rde_main(int debug, int verbose)
                rde_update_queue_runner();
                for (aid = AID_INET6; aid < AID_MAX; aid++)
                        rde_update6_queue_runner(aid);
-               if (rib_dump_pending() &&
+               if (rde_dump_pending() &&
                    ibuf_se_ctl && ibuf_se_ctl->w.queued <= 10)
-                       rib_dump_runner();
+                       rde_dump_runner();
        }
 
        /* close pipes */
@@ -334,7 +343,6 @@ rde_main(int debug, int verbose)
        while ((mctx = LIST_FIRST(&rde_mrts)) != NULL) {
                msgbuf_clear(&mctx->mrt.wbuf);
                close(mctx->mrt.wbuf.fd);
-               LIST_REMOVE(&mctx->ribctx, entry);
                LIST_REMOVE(mctx, entry);
                free(mctx);
        }
@@ -611,6 +619,23 @@ badnet:
                        memcpy(&verbose, imsg.data, sizeof(verbose));
                        log_setverbose(verbose);
                        break;
+               case IMSG_XON:
+                       if (imsg.hdr.peerid) {
+                               peer = peer_get(imsg.hdr.peerid);
+                               if (peer)
+                                       peer->throttled = 0;
+                               break;
+                       } else
+                               rde_dump_ctx_throttle(imsg.hdr.pid, 0);
+                       break;
+               case IMSG_XOFF:
+                       if (imsg.hdr.peerid) {
+                               peer = peer_get(imsg.hdr.peerid);
+                               if (peer)
+                                       peer->throttled = 1;
+                       } else
+                               rde_dump_ctx_throttle(imsg.hdr.pid, 1);
+                       break;
                default:
                        break;
                }
@@ -737,7 +762,7 @@ rde_dispatch_imsg_parent(struct imsgbuf 
                                 */
                                in_rules = ribd->in_rules;
                                ribd->in_rules = NULL;
-                               rib_free(rib);
+                               rde_rib_free(ribd);
                                rib = rib_new(rn.name, rn.rtableid, rn.flags);
                                ribd->in_rules = in_rules;
                        } else
@@ -2356,7 +2381,7 @@ rde_dump_ctx_new(struct ctl_show_rib_req
        memcpy(&ctx->req, req, sizeof(struct ctl_show_rib_request));
        ctx->req.pid = pid;
        ctx->req.type = type;
-       ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS;
+       ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK;
        ctx->ribctx.ctx_rib = rib;
        switch (ctx->req.type) {
        case IMSG_CTL_SHOW_NETWORK:
@@ -2398,20 +2423,72 @@ rde_dump_ctx_new(struct ctl_show_rib_req
        ctx->ribctx.ctx_done = rde_dump_done;
        ctx->ribctx.ctx_arg = ctx;
        ctx->ribctx.ctx_aid = ctx->req.aid;
+       LIST_INSERT_HEAD(&rde_dump_h, ctx, entry);
        rib_dump_r(&ctx->ribctx);
 }
 
 void
+rde_dump_ctx_throttle(pid_t pid, int throttle)
+{
+       struct rde_dump_ctx     *ctx;
+
+       LIST_FOREACH(ctx, &rde_dump_h, entry) {
+               if (ctx->req.pid == pid) {
+                       ctx->throttled = throttle;
+                       return;
+               }
+       }
+}
+
+void
+rde_dump_runner(void)
+{
+       struct rde_dump_ctx     *ctx, *next;
+
+       for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) {
+               next = LIST_NEXT(ctx, entry);
+               if (!ctx->throttled)
+                       rib_dump_r(&ctx->ribctx);
+       }
+}
+
+int
+rde_dump_pending(void)
+{
+       struct rde_dump_ctx     *ctx;
+
+       /* return true if there is at least one unthrottled context */
+       LIST_FOREACH(ctx, &rde_dump_h, entry)
+               if (!ctx->throttled)
+                       return (1);
+
+       return (0);
+}
+
+void
 rde_dump_done(void *arg)
 {
        struct rde_dump_ctx     *ctx = arg;
 
        imsg_compose(ibuf_se_ctl, IMSG_CTL_END, 0, ctx->req.pid,
            -1, NULL, 0);
+       LIST_REMOVE(ctx, entry);
        free(ctx);
 }
 
 void
+rde_dump_rib_free(struct rib *rib)
+{
+       struct rde_dump_ctx     *ctx, *next;
+
+       for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) {
+               next = LIST_NEXT(ctx, entry);
+               if (ctx->ribctx.ctx_rib == rib)
+                       rde_dump_done(ctx);
+       }
+}
+
+void
 rde_dump_mrt_new(struct mrt *mrt, pid_t pid, int fd)
 {
        struct rde_mrt_ctx      *ctx;
@@ -2435,7 +2512,7 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t 
        if (ctx->mrt.type == MRT_TABLE_DUMP_V2)
                mrt_dump_v2_hdr(&ctx->mrt, conf, &peerlist);
 
-       ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS;
+       ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK;
        ctx->ribctx.ctx_rib = rib;
        ctx->ribctx.ctx_upcall = mrt_dump_upcall;
        ctx->ribctx.ctx_done = mrt_done;
@@ -2446,6 +2523,28 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t 
        rib_dump_r(&ctx->ribctx);
 }
 
+void
+rde_dump_mrt_free(struct rib *rib)
+{
+       struct rde_mrt_ctx      *ctx, *next;
+
+       for (ctx = LIST_FIRST(&rde_mrts); ctx != NULL; ctx = next) {
+               next = LIST_NEXT(ctx, entry);
+               if (ctx->ribctx.ctx_rib == rib)
+                       mrt_done(&ctx->mrt);
+       }
+}
+
+void
+rde_rib_free(struct rib_desc *rd)
+{
+       /* abort pending rib_dumps */
+       rde_dump_rib_free(&rd->rib);
+       rde_dump_mrt_free(&rd->rib);
+
+       rib_free(&rd->rib);
+}
+
 /*
  * kroute specific functions
  */
@@ -2847,7 +2946,7 @@ rde_reload_done(void)
 
                switch (ribs[rid].state) {
                case RECONF_DELETE:
-                       rib_free(&ribs[rid].rib);
+                       rde_rib_free(&ribs[rid]);
                        break;
                case RECONF_KEEP:
                        if (rde_filter_equal(ribs[rid].in_rules,
Index: rde.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
retrieving revision 1.160
diff -u -p -r1.160 rde.h
--- rde.h       25 Jan 2017 03:21:55 -0000      1.160
+++ rde.h       25 Jan 2017 04:22:34 -0000
@@ -81,6 +81,7 @@ struct rde_peer {
        u_int16_t                        mrt_idx;
        u_int8_t                         reconf_out;    /* out filter changed */
        u_int8_t                         reconf_rib;    /* rib changed */
+       u_int8_t                         throttled;
 };
 
 #define AS_SET                 1
@@ -267,7 +268,6 @@ struct pt_entry_vpn4 {
 };
 
 struct rib_context {
-       LIST_ENTRY(rib_context)          entry;
        struct rib_entry                *ctx_re;
        struct rib                      *ctx_rib;
        void            (*ctx_upcall)(struct rib_entry *, void *);
@@ -436,8 +436,6 @@ struct rib_entry *rib_lookup(struct rib 
 void            rib_dump(struct rib *, void (*)(struct rib_entry *, void *),
                     void *, u_int8_t);
 void            rib_dump_r(struct rib_context *);
-void            rib_dump_runner(void);
-int             rib_dump_pending(void);
 
 static inline struct rib *
 re_rib(struct rib_entry *re)
Index: rde_rib.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde_rib.c,v
retrieving revision 1.153
diff -u -p -r1.153 rde_rib.c
--- rde_rib.c   25 Jan 2017 03:21:55 -0000      1.153
+++ rde_rib.c   26 May 2017 18:08:29 -0000
@@ -38,8 +38,6 @@
 u_int16_t rib_size;
 struct rib_desc *ribs;
 
-LIST_HEAD(, rib_context) rib_dump_h = LIST_HEAD_INITIALIZER(rib_dump_h);
-
 struct rib_entry *rib_add(struct rib *, struct bgpd_addr *, int);
 int rib_compare(const struct rib_entry *, const struct rib_entry *);
 void rib_remove(struct rib_entry *);
@@ -136,25 +134,10 @@ rib_desc(struct rib *rib)
 void
 rib_free(struct rib *rib)
 {
-       struct rib_context *ctx, *next;
        struct rib_desc *rd;
        struct rib_entry *re, *xre;
        struct prefix *p, *np;
 
-       /* abort pending rib_dumps */
-       for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) {
-               next = LIST_NEXT(ctx, entry);
-               if (ctx->ctx_rib == rib) {
-                       re = ctx->ctx_re;
-                       re_unlock(re);
-                       LIST_REMOVE(ctx, entry);
-                       if (ctx->ctx_done)
-                               ctx->ctx_done(ctx->ctx_arg);
-                       else
-                               free(ctx);
-               }
-       }
-
        for (re = RB_MIN(rib_tree, rib_tree(rib)); re != NULL; re = xre) {
                xre = RB_NEXT(rib_tree, rib_tree(rib), re);
 
@@ -311,10 +294,9 @@ rib_dump_r(struct rib_context *ctx)
        struct rib_entry        *re;
        unsigned int             i;
 
-       if (ctx->ctx_re == NULL) {
+       if (ctx->ctx_re == NULL)
                re = RB_MIN(rib_tree, rib_tree(ctx->ctx_rib));
-               LIST_INSERT_HEAD(&rib_dump_h, ctx, entry);
-       } else
+       else
                re = rib_restart(ctx);
 
        for (i = 0; re != NULL; re = RB_NEXT(rib_tree, unused, re)) {
@@ -322,7 +304,7 @@ rib_dump_r(struct rib_context *ctx)
                    ctx->ctx_aid != re->prefix->aid)
                        continue;
                if (ctx->ctx_count && i++ >= ctx->ctx_count &&
-                   re_is_locked(re)) {
+                   !re_is_locked(re)) {
                        /* store and lock last element */
                        ctx->ctx_re = re;
                        re_lock(re);
@@ -331,7 +313,6 @@ rib_dump_r(struct rib_context *ctx)
                ctx->ctx_upcall(re, ctx->ctx_arg);
        }
 
-       LIST_REMOVE(ctx, entry);
        if (ctx->ctx_done)
                ctx->ctx_done(ctx->ctx_arg);
        else
@@ -355,23 +336,6 @@ rib_restart(struct rib_context *ctx)
                rib_remove(ctx->ctx_re);
        ctx->ctx_re = NULL;
        return (re);
-}
-
-void
-rib_dump_runner(void)
-{
-       struct rib_context      *ctx, *next;
-
-       for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) {
-               next = LIST_NEXT(ctx, entry);
-               rib_dump_r(ctx);
-       }
-}
-
-int
-rib_dump_pending(void)
-{
-       return (!LIST_EMPTY(&rib_dump_h));
 }
 
 /* used to bump correct prefix counters */
Index: session.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/session.c,v
retrieving revision 1.359
diff -u -p -r1.359 session.c
--- session.c   13 Feb 2017 14:48:44 -0000      1.359
+++ session.c   26 May 2017 18:58:46 -0000
@@ -195,7 +195,6 @@ session_main(int debug, int verbose)
        u_int                    pfd_elms = 0, peer_l_elms = 0, mrt_l_elms = 0;
        u_int                    listener_cnt, ctl_cnt, mrt_cnt;
        u_int                    new_cnt;
-       u_int32_t                ctl_queued;
        struct passwd           *pw;
        struct peer             *p, **peer_l = NULL, *last, *next;
        struct mrt              *m, *xm, **mrt_l = NULL;
@@ -356,17 +355,7 @@ session_main(int debug, int verbose)
 
                set_pollfd(&pfd[PFD_PIPE_MAIN], ibuf_main);
                set_pollfd(&pfd[PFD_PIPE_ROUTE], ibuf_rde);
-
-               ctl_queued = 0;
-               TAILQ_FOREACH(ctl_conn, &ctl_conns, entry)
-                       ctl_queued += ctl_conn->ibuf.w.queued;
-
-               /*
-                * Do not act as unlimited buffer. Don't read in more
-                * messages if the ctl sockets are getting full.
-                */
-               if (ctl_queued < SESSION_CTL_QUEUE_MAX)
-                       set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl);
+               set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl);
 
                if (pauseaccept == 0) {
                        pfd[PFD_SOCK_CTL].fd = csock;
@@ -1389,6 +1378,13 @@ session_sendmsg(struct bgp_msg *msg, str
        }
 
        ibuf_close(&p->wbuf, msg->buf);
+       if (!p->throttled && p->wbuf.queued > SESS_MSG_HIGH_MARK) {
+               if (imsg_compose(ibuf_rde, IMSG_XOFF, p->conf.id, 0, -1,
+                   NULL, 0) == -1)
+                       log_peer_warn(&p->conf, "imsg_compose XOFF");
+               p->throttled = 1;
+       }
+
        free(msg);
        return (0);
 }
@@ -1773,6 +1769,12 @@ session_dispatch_msg(struct pollfd *pfd,
                                log_peer_warn(&p->conf, "write error");
                        bgp_fsm(p, EVNT_CON_FATAL);
                        return (1);
+               }
+               if (p->throttled && p->wbuf.queued < SESS_MSG_LOW_MARK) {
+                       if (imsg_compose(ibuf_rde, IMSG_XON, p->conf.id, 0, -1,
+                           NULL, 0) == -1)
+                               log_peer_warn(&p->conf, "imsg_compose XON");
+                       p->throttled = 0;
                }
                if (!(pfd->revents & POLLIN))
                        return (1);
Index: session.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/session.h,v
retrieving revision 1.122
diff -u -p -r1.122 session.h
--- session.h   13 Jan 2017 18:59:12 -0000      1.122
+++ session.h   19 Jan 2017 23:32:14 -0000
@@ -142,6 +142,7 @@ struct ctl_conn {
        TAILQ_ENTRY(ctl_conn)   entry;
        struct imsgbuf          ibuf;
        int                     restricted;
+       int                     throttled;
 };
 
 TAILQ_HEAD(ctl_conns, ctl_conn)        ctl_conns;
@@ -225,6 +226,7 @@ struct peer {
        u_int8_t                 depend_ok;
        u_int8_t                 demoted;
        u_int8_t                 passive;
+       u_int8_t                 throttled;
 };
 
 extern struct peer     *peers;

Reply via email to