Both bgpctl and bgp neighbors are often not fast enough to keep up with
the RDE. The result is quite a bit of memory bloat or some ugly
workarounds for bgpctl which can result in starving other bgpctl calls to
death.
This implements a simple XON / XOFF protocol for peers and control
sessions and helps reducing the pain on busy boxes. It is a first step.
There is still some major changes needed to reduce the update overhead
seen when many session start up at the same time.
I would love to hear from people with larger setups if there are any
problems.
--
:wq Claudio
Index: bgpd.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/bgpd.h,v
retrieving revision 1.300
diff -u -p -r1.300 bgpd.h
--- bgpd.h 25 Jan 2017 00:11:07 -0000 1.300
+++ bgpd.h 25 Jan 2017 04:22:34 -0000
@@ -87,13 +87,17 @@
#define F_RTLABEL 0x10000
/*
- * Limit the number of control messages generated by the RDE and queued in
- * session engine. The RDE limit defines how many imsg are generated in
- * one poll round. Then if the SE limit is hit the RDE control socket will no
- * longer be polled.
+ * Limit the number of messages queued in the session engine.
+ * The SE will send an IMSG_XOFF messages to the RDE if the high water mark
+ * is reached. The RDE should then throttle this peer or control connection.
+ * Once the message queue in the SE drops below the low water mark an
+ * IMSG_XON message will be sent and the RDE will produce more messages again.
*/
#define RDE_RUNNER_ROUNDS 100
-#define SESSION_CTL_QUEUE_MAX 10000
+#define SESS_MSG_HIGH_MARK 300
+#define SESS_MSG_LOW_MARK 50
+#define CTL_MSG_HIGH_MARK 500
+#define CTL_MSG_LOW_MARK 100
enum bgpd_process {
PROC_MAIN,
@@ -425,7 +429,9 @@ enum imsg_type {
IMSG_PFTABLE_COMMIT,
IMSG_REFRESH,
IMSG_IFINFO,
- IMSG_DEMOTE
+ IMSG_DEMOTE,
+ IMSG_XON,
+ IMSG_XOFF
};
struct demote_msg {
Index: control.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/control.c,v
retrieving revision 1.87
diff -u -p -r1.87 control.c
--- control.c 13 Feb 2017 14:48:44 -0000 1.87
+++ control.c 16 Feb 2017 19:20:23 -0000
@@ -213,11 +213,16 @@ control_dispatch_msg(struct pollfd *pfd,
return (0);
}
- if (pfd->revents & POLLOUT)
+ if (pfd->revents & POLLOUT) {
if (msgbuf_write(&c->ibuf.w) <= 0 && errno != EAGAIN) {
*ctl_cnt -= control_close(pfd->fd);
return (1);
}
+ if (c->throttled && c->ibuf.w.queued < CTL_MSG_LOW_MARK) {
+ if (imsg_ctl_rde(IMSG_XON, c->ibuf.pid, NULL, 0) != -1)
+ c->throttled = 0;
+ }
+ }
if (!(pfd->revents & POLLIN))
return (0);
@@ -521,6 +526,11 @@ control_imsg_relay(struct imsg *imsg)
if ((c = control_connbypid(imsg->hdr.pid)) == NULL)
return (0);
+
+ if (!c->throttled && c->ibuf.w.queued > CTL_MSG_HIGH_MARK) {
+ if (imsg_ctl_rde(IMSG_XOFF, imsg->hdr.pid, NULL, 0) != -1)
+ c->throttled = 1;
+ }
return (imsg_compose(&c->ibuf, imsg->hdr.type, 0, imsg->hdr.pid, -1,
imsg->data, imsg->hdr.len - IMSG_HEADER_SIZE));
Index: rde.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.c,v
retrieving revision 1.361
diff -u -p -r1.361 rde.c
--- rde.c 25 Jan 2017 03:21:55 -0000 1.361
+++ rde.c 26 May 2017 18:57:51 -0000
@@ -76,7 +76,7 @@ void rde_update_log(const char *, u_in
void rde_as4byte_fixup(struct rde_peer *, struct rde_aspath *);
void rde_reflector(struct rde_peer *, struct rde_aspath *);
-void rde_dump_rib_as(struct prefix *, struct rde_aspath *,pid_t,
+void rde_dump_rib_as(struct prefix *, struct rde_aspath *, pid_t,
int);
void rde_dump_filter(struct prefix *,
struct ctl_show_rib_request *);
@@ -86,8 +86,14 @@ void rde_dump_upcall(struct rib_entry
void rde_dump_prefix_upcall(struct rib_entry *, void *);
void rde_dump_ctx_new(struct ctl_show_rib_request *, pid_t,
enum imsg_type);
-void rde_dump_mrt_new(struct mrt *, pid_t, int);
+void rde_dump_ctx_throttle(pid_t pid, int throttle);
+void rde_dump_runner(void);
+int rde_dump_pending(void);
void rde_dump_done(void *);
+void rde_dump_mrt_new(struct mrt *, pid_t, int);
+void rde_dump_rib_free(struct rib *);
+void rde_dump_mrt_free(struct rib *);
+void rde_rib_free(struct rib_desc *);
int rde_rdomain_import(struct rde_aspath *, struct rdomain *);
void rde_reload_done(void);
@@ -131,15 +137,19 @@ struct imsgbuf *ibuf_main;
struct rde_memstats rdemem;
struct rde_dump_ctx {
+ LIST_ENTRY(rde_dump_ctx) entry;
struct rib_context ribctx;
struct ctl_show_rib_request req;
sa_family_t af;
+ u_int8_t throttled;
};
+LIST_HEAD(, rde_dump_ctx) rde_dump_h = LIST_HEAD_INITIALIZER(rde_dump_h);
+
struct rde_mrt_ctx {
- struct mrt mrt;
- struct rib_context ribctx;
LIST_ENTRY(rde_mrt_ctx) entry;
+ struct rib_context ribctx;
+ struct mrt mrt;
};
LIST_HEAD(, rde_mrt_ctx) rde_mrts = LIST_HEAD_INITIALIZER(rde_mrts);
@@ -248,7 +258,7 @@ rde_main(int debug, int verbose)
set_pollfd(&pfd[PFD_PIPE_SESSION], ibuf_se);
set_pollfd(&pfd[PFD_PIPE_SESSION_CTL], ibuf_se_ctl);
- if (rib_dump_pending() &&
+ if (rde_dump_pending() &&
ibuf_se_ctl && ibuf_se_ctl->w.queued == 0)
timeout = 0;
@@ -261,7 +271,6 @@ rde_main(int debug, int verbose)
i++;
} else if (mctx->mrt.state == MRT_STATE_REMOVE) {
close(mctx->mrt.wbuf.fd);
- LIST_REMOVE(&mctx->ribctx, entry);
LIST_REMOVE(mctx, entry);
free(mctx);
rde_mrt_cnt--;
@@ -307,9 +316,9 @@ rde_main(int debug, int verbose)
rde_update_queue_runner();
for (aid = AID_INET6; aid < AID_MAX; aid++)
rde_update6_queue_runner(aid);
- if (rib_dump_pending() &&
+ if (rde_dump_pending() &&
ibuf_se_ctl && ibuf_se_ctl->w.queued <= 10)
- rib_dump_runner();
+ rde_dump_runner();
}
/* close pipes */
@@ -334,7 +343,6 @@ rde_main(int debug, int verbose)
while ((mctx = LIST_FIRST(&rde_mrts)) != NULL) {
msgbuf_clear(&mctx->mrt.wbuf);
close(mctx->mrt.wbuf.fd);
- LIST_REMOVE(&mctx->ribctx, entry);
LIST_REMOVE(mctx, entry);
free(mctx);
}
@@ -611,6 +619,23 @@ badnet:
memcpy(&verbose, imsg.data, sizeof(verbose));
log_setverbose(verbose);
break;
+ case IMSG_XON:
+ if (imsg.hdr.peerid) {
+ peer = peer_get(imsg.hdr.peerid);
+ if (peer)
+ peer->throttled = 0;
+ break;
+ } else
+ rde_dump_ctx_throttle(imsg.hdr.pid, 0);
+ break;
+ case IMSG_XOFF:
+ if (imsg.hdr.peerid) {
+ peer = peer_get(imsg.hdr.peerid);
+ if (peer)
+ peer->throttled = 1;
+ } else
+ rde_dump_ctx_throttle(imsg.hdr.pid, 1);
+ break;
default:
break;
}
@@ -737,7 +762,7 @@ rde_dispatch_imsg_parent(struct imsgbuf
*/
in_rules = ribd->in_rules;
ribd->in_rules = NULL;
- rib_free(rib);
+ rde_rib_free(ribd);
rib = rib_new(rn.name, rn.rtableid, rn.flags);
ribd->in_rules = in_rules;
} else
@@ -2356,7 +2381,7 @@ rde_dump_ctx_new(struct ctl_show_rib_req
memcpy(&ctx->req, req, sizeof(struct ctl_show_rib_request));
ctx->req.pid = pid;
ctx->req.type = type;
- ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS;
+ ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK;
ctx->ribctx.ctx_rib = rib;
switch (ctx->req.type) {
case IMSG_CTL_SHOW_NETWORK:
@@ -2398,20 +2423,72 @@ rde_dump_ctx_new(struct ctl_show_rib_req
ctx->ribctx.ctx_done = rde_dump_done;
ctx->ribctx.ctx_arg = ctx;
ctx->ribctx.ctx_aid = ctx->req.aid;
+ LIST_INSERT_HEAD(&rde_dump_h, ctx, entry);
rib_dump_r(&ctx->ribctx);
}
void
+rde_dump_ctx_throttle(pid_t pid, int throttle)
+{
+ struct rde_dump_ctx *ctx;
+
+ LIST_FOREACH(ctx, &rde_dump_h, entry) {
+ if (ctx->req.pid == pid) {
+ ctx->throttled = throttle;
+ return;
+ }
+ }
+}
+
+void
+rde_dump_runner(void)
+{
+ struct rde_dump_ctx *ctx, *next;
+
+ for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) {
+ next = LIST_NEXT(ctx, entry);
+ if (!ctx->throttled)
+ rib_dump_r(&ctx->ribctx);
+ }
+}
+
+int
+rde_dump_pending(void)
+{
+ struct rde_dump_ctx *ctx;
+
+ /* return true if there is at least one unthrottled context */
+ LIST_FOREACH(ctx, &rde_dump_h, entry)
+ if (!ctx->throttled)
+ return (1);
+
+ return (0);
+}
+
+void
rde_dump_done(void *arg)
{
struct rde_dump_ctx *ctx = arg;
imsg_compose(ibuf_se_ctl, IMSG_CTL_END, 0, ctx->req.pid,
-1, NULL, 0);
+ LIST_REMOVE(ctx, entry);
free(ctx);
}
void
+rde_dump_rib_free(struct rib *rib)
+{
+ struct rde_dump_ctx *ctx, *next;
+
+ for (ctx = LIST_FIRST(&rde_dump_h); ctx != NULL; ctx = next) {
+ next = LIST_NEXT(ctx, entry);
+ if (ctx->ribctx.ctx_rib == rib)
+ rde_dump_done(ctx);
+ }
+}
+
+void
rde_dump_mrt_new(struct mrt *mrt, pid_t pid, int fd)
{
struct rde_mrt_ctx *ctx;
@@ -2435,7 +2512,7 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t
if (ctx->mrt.type == MRT_TABLE_DUMP_V2)
mrt_dump_v2_hdr(&ctx->mrt, conf, &peerlist);
- ctx->ribctx.ctx_count = RDE_RUNNER_ROUNDS;
+ ctx->ribctx.ctx_count = CTL_MSG_HIGH_MARK;
ctx->ribctx.ctx_rib = rib;
ctx->ribctx.ctx_upcall = mrt_dump_upcall;
ctx->ribctx.ctx_done = mrt_done;
@@ -2446,6 +2523,28 @@ rde_dump_mrt_new(struct mrt *mrt, pid_t
rib_dump_r(&ctx->ribctx);
}
+void
+rde_dump_mrt_free(struct rib *rib)
+{
+ struct rde_mrt_ctx *ctx, *next;
+
+ for (ctx = LIST_FIRST(&rde_mrts); ctx != NULL; ctx = next) {
+ next = LIST_NEXT(ctx, entry);
+ if (ctx->ribctx.ctx_rib == rib)
+ mrt_done(&ctx->mrt);
+ }
+}
+
+void
+rde_rib_free(struct rib_desc *rd)
+{
+ /* abort pending rib_dumps */
+ rde_dump_rib_free(&rd->rib);
+ rde_dump_mrt_free(&rd->rib);
+
+ rib_free(&rd->rib);
+}
+
/*
* kroute specific functions
*/
@@ -2847,7 +2946,7 @@ rde_reload_done(void)
switch (ribs[rid].state) {
case RECONF_DELETE:
- rib_free(&ribs[rid].rib);
+ rde_rib_free(&ribs[rid]);
break;
case RECONF_KEEP:
if (rde_filter_equal(ribs[rid].in_rules,
Index: rde.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde.h,v
retrieving revision 1.160
diff -u -p -r1.160 rde.h
--- rde.h 25 Jan 2017 03:21:55 -0000 1.160
+++ rde.h 25 Jan 2017 04:22:34 -0000
@@ -81,6 +81,7 @@ struct rde_peer {
u_int16_t mrt_idx;
u_int8_t reconf_out; /* out filter changed */
u_int8_t reconf_rib; /* rib changed */
+ u_int8_t throttled;
};
#define AS_SET 1
@@ -267,7 +268,6 @@ struct pt_entry_vpn4 {
};
struct rib_context {
- LIST_ENTRY(rib_context) entry;
struct rib_entry *ctx_re;
struct rib *ctx_rib;
void (*ctx_upcall)(struct rib_entry *, void *);
@@ -436,8 +436,6 @@ struct rib_entry *rib_lookup(struct rib
void rib_dump(struct rib *, void (*)(struct rib_entry *, void *),
void *, u_int8_t);
void rib_dump_r(struct rib_context *);
-void rib_dump_runner(void);
-int rib_dump_pending(void);
static inline struct rib *
re_rib(struct rib_entry *re)
Index: rde_rib.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/rde_rib.c,v
retrieving revision 1.153
diff -u -p -r1.153 rde_rib.c
--- rde_rib.c 25 Jan 2017 03:21:55 -0000 1.153
+++ rde_rib.c 26 May 2017 18:08:29 -0000
@@ -38,8 +38,6 @@
u_int16_t rib_size;
struct rib_desc *ribs;
-LIST_HEAD(, rib_context) rib_dump_h = LIST_HEAD_INITIALIZER(rib_dump_h);
-
struct rib_entry *rib_add(struct rib *, struct bgpd_addr *, int);
int rib_compare(const struct rib_entry *, const struct rib_entry *);
void rib_remove(struct rib_entry *);
@@ -136,25 +134,10 @@ rib_desc(struct rib *rib)
void
rib_free(struct rib *rib)
{
- struct rib_context *ctx, *next;
struct rib_desc *rd;
struct rib_entry *re, *xre;
struct prefix *p, *np;
- /* abort pending rib_dumps */
- for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) {
- next = LIST_NEXT(ctx, entry);
- if (ctx->ctx_rib == rib) {
- re = ctx->ctx_re;
- re_unlock(re);
- LIST_REMOVE(ctx, entry);
- if (ctx->ctx_done)
- ctx->ctx_done(ctx->ctx_arg);
- else
- free(ctx);
- }
- }
-
for (re = RB_MIN(rib_tree, rib_tree(rib)); re != NULL; re = xre) {
xre = RB_NEXT(rib_tree, rib_tree(rib), re);
@@ -311,10 +294,9 @@ rib_dump_r(struct rib_context *ctx)
struct rib_entry *re;
unsigned int i;
- if (ctx->ctx_re == NULL) {
+ if (ctx->ctx_re == NULL)
re = RB_MIN(rib_tree, rib_tree(ctx->ctx_rib));
- LIST_INSERT_HEAD(&rib_dump_h, ctx, entry);
- } else
+ else
re = rib_restart(ctx);
for (i = 0; re != NULL; re = RB_NEXT(rib_tree, unused, re)) {
@@ -322,7 +304,7 @@ rib_dump_r(struct rib_context *ctx)
ctx->ctx_aid != re->prefix->aid)
continue;
if (ctx->ctx_count && i++ >= ctx->ctx_count &&
- re_is_locked(re)) {
+ !re_is_locked(re)) {
/* store and lock last element */
ctx->ctx_re = re;
re_lock(re);
@@ -331,7 +313,6 @@ rib_dump_r(struct rib_context *ctx)
ctx->ctx_upcall(re, ctx->ctx_arg);
}
- LIST_REMOVE(ctx, entry);
if (ctx->ctx_done)
ctx->ctx_done(ctx->ctx_arg);
else
@@ -355,23 +336,6 @@ rib_restart(struct rib_context *ctx)
rib_remove(ctx->ctx_re);
ctx->ctx_re = NULL;
return (re);
-}
-
-void
-rib_dump_runner(void)
-{
- struct rib_context *ctx, *next;
-
- for (ctx = LIST_FIRST(&rib_dump_h); ctx != NULL; ctx = next) {
- next = LIST_NEXT(ctx, entry);
- rib_dump_r(ctx);
- }
-}
-
-int
-rib_dump_pending(void)
-{
- return (!LIST_EMPTY(&rib_dump_h));
}
/* used to bump correct prefix counters */
Index: session.c
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/session.c,v
retrieving revision 1.359
diff -u -p -r1.359 session.c
--- session.c 13 Feb 2017 14:48:44 -0000 1.359
+++ session.c 26 May 2017 18:58:46 -0000
@@ -195,7 +195,6 @@ session_main(int debug, int verbose)
u_int pfd_elms = 0, peer_l_elms = 0, mrt_l_elms = 0;
u_int listener_cnt, ctl_cnt, mrt_cnt;
u_int new_cnt;
- u_int32_t ctl_queued;
struct passwd *pw;
struct peer *p, **peer_l = NULL, *last, *next;
struct mrt *m, *xm, **mrt_l = NULL;
@@ -356,17 +355,7 @@ session_main(int debug, int verbose)
set_pollfd(&pfd[PFD_PIPE_MAIN], ibuf_main);
set_pollfd(&pfd[PFD_PIPE_ROUTE], ibuf_rde);
-
- ctl_queued = 0;
- TAILQ_FOREACH(ctl_conn, &ctl_conns, entry)
- ctl_queued += ctl_conn->ibuf.w.queued;
-
- /*
- * Do not act as unlimited buffer. Don't read in more
- * messages if the ctl sockets are getting full.
- */
- if (ctl_queued < SESSION_CTL_QUEUE_MAX)
- set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl);
+ set_pollfd(&pfd[PFD_PIPE_ROUTE_CTL], ibuf_rde_ctl);
if (pauseaccept == 0) {
pfd[PFD_SOCK_CTL].fd = csock;
@@ -1389,6 +1378,13 @@ session_sendmsg(struct bgp_msg *msg, str
}
ibuf_close(&p->wbuf, msg->buf);
+ if (!p->throttled && p->wbuf.queued > SESS_MSG_HIGH_MARK) {
+ if (imsg_compose(ibuf_rde, IMSG_XOFF, p->conf.id, 0, -1,
+ NULL, 0) == -1)
+ log_peer_warn(&p->conf, "imsg_compose XOFF");
+ p->throttled = 1;
+ }
+
free(msg);
return (0);
}
@@ -1773,6 +1769,12 @@ session_dispatch_msg(struct pollfd *pfd,
log_peer_warn(&p->conf, "write error");
bgp_fsm(p, EVNT_CON_FATAL);
return (1);
+ }
+ if (p->throttled && p->wbuf.queued < SESS_MSG_LOW_MARK) {
+ if (imsg_compose(ibuf_rde, IMSG_XON, p->conf.id, 0, -1,
+ NULL, 0) == -1)
+ log_peer_warn(&p->conf, "imsg_compose XON");
+ p->throttled = 0;
}
if (!(pfd->revents & POLLIN))
return (1);
Index: session.h
===================================================================
RCS file: /cvs/src/usr.sbin/bgpd/session.h,v
retrieving revision 1.122
diff -u -p -r1.122 session.h
--- session.h 13 Jan 2017 18:59:12 -0000 1.122
+++ session.h 19 Jan 2017 23:32:14 -0000
@@ -142,6 +142,7 @@ struct ctl_conn {
TAILQ_ENTRY(ctl_conn) entry;
struct imsgbuf ibuf;
int restricted;
+ int throttled;
};
TAILQ_HEAD(ctl_conns, ctl_conn) ctl_conns;
@@ -225,6 +226,7 @@ struct peer {
u_int8_t depend_ok;
u_int8_t demoted;
u_int8_t passive;
+ u_int8_t throttled;
};
extern struct peer *peers;