Currently rpki-client keeps all pending work on a queue and only removes
it from the queue at once it got processed. The only bit that the parent
rpki-client process needs from the queue is the type when processing the
response. So instead of passing the id pass the type back from the parser.
With this the queue only holds entries that can't be processed right now
because the repository is not yet loaded. Additionally the handling of
responses becomes more decoupled.
All in all I think this simplifies the code a fair bit. What do others
think?
--
:wq Claudio
Index: main.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v
retrieving revision 1.88
diff -u -p -r1.88 main.c
--- main.c 21 Dec 2020 11:35:55 -0000 1.88
+++ main.c 7 Jan 2021 13:22:02 -0000
@@ -88,6 +88,7 @@ struct repo {
size_t id; /* identifier (array index) */
};
+size_t entity_queue;
int timeout = 60*60;
volatile sig_atomic_t killme;
void suicide(int sig);
@@ -105,7 +106,6 @@ static struct repotab {
* and parsed.
*/
struct entity {
- size_t id; /* unique identifier */
enum rtype type; /* type of entity (not RTYPE_EOF) */
char *uri; /* file or rsync:// URI */
int has_dgst; /* whether dgst is specified */
@@ -223,7 +223,6 @@ static void
entity_read_req(int fd, struct entity *ent)
{
- io_simple_read(fd, &ent->id, sizeof(size_t));
io_simple_read(fd, &ent->type, sizeof(enum rtype));
io_str_read(fd, &ent->uri);
io_simple_read(fd, &ent->has_dgst, sizeof(int));
@@ -244,7 +243,6 @@ entity_buffer_req(char **b, size_t *bsz,
const struct entity *ent)
{
- io_simple_buffer(b, bsz, bmax, &ent->id, sizeof(size_t));
io_simple_buffer(b, bsz, bmax, &ent->type, sizeof(enum rtype));
io_str_buffer(b, bsz, bmax, ent->uri);
io_simple_buffer(b, bsz, bmax, &ent->has_dgst, sizeof(int));
@@ -278,12 +276,14 @@ entity_write_req(int fd, const struct en
static void
entityq_flush(int fd, struct entityq *q, const struct repo *repo)
{
- struct entity *p;
+ struct entity *p, *np;
- TAILQ_FOREACH(p, q, entries) {
+ TAILQ_FOREACH_SAFE(p, q, entries, np) {
if (p->repo < 0 || repo->id != (size_t)p->repo)
continue;
entity_write_req(fd, p);
+ TAILQ_REMOVE(q, p, entries);
+ entity_free(p);
}
}
@@ -365,49 +365,18 @@ repo_filename(const struct repo *repo, c
}
/*
- * Read the next entity from the parser process, removing it from the
- * queue of pending requests in the process.
- * This always returns a valid entity.
- */
-static struct entity *
-entityq_next(int fd, struct entityq *q)
-{
- size_t id;
- struct entity *entp;
-
- io_simple_read(fd, &id, sizeof(size_t));
-
- TAILQ_FOREACH(entp, q, entries)
- if (entp->id == id)
- break;
-
- assert(entp != NULL);
- TAILQ_REMOVE(q, entp, entries);
- return entp;
-}
-
-static void
-entity_buffer_resp(char **b, size_t *bsz, size_t *bmax,
- const struct entity *ent)
-{
-
- io_simple_buffer(b, bsz, bmax, &ent->id, sizeof(size_t));
-}
-
-/*
* Add the heap-allocated file to the queue for processing.
*/
static void
entityq_add(int fd, struct entityq *q, char *file, enum rtype type,
const struct repo *rp, const unsigned char *dgst,
- const unsigned char *pkey, size_t pkeysz, char *descr, size_t *eid)
+ const unsigned char *pkey, size_t pkeysz, char *descr)
{
struct entity *p;
if ((p = calloc(1, sizeof(struct entity))) == NULL)
err(1, "calloc");
- p->id = (*eid)++;
p->type = type;
p->uri = file;
p->repo = (rp != NULL) ? (ssize_t)rp->id : -1;
@@ -426,15 +395,19 @@ entityq_add(int fd, struct entityq *q, c
err(1, "strdup");
filepath_add(file);
- TAILQ_INSERT_TAIL(q, p, entries);
+
+ entity_queue++;
/*
* Write to the queue if there's no repo or the repo has already
- * been loaded.
+ * been loaded else enqueue it for later.
*/
- if (rp == NULL || rp->loaded)
+ if (rp == NULL || rp->loaded) {
entity_write_req(fd, p);
+ entity_free(p);
+ } else
+ TAILQ_INSERT_TAIL(q, p, entries);
}
/*
@@ -443,7 +416,7 @@ entityq_add(int fd, struct entityq *q, c
*/
static void
queue_add_from_mft(int fd, struct entityq *q, const char *mft,
- const struct mftfile *file, enum rtype type, size_t *eid)
+ const struct mftfile *file, enum rtype type)
{
char *cp, *nfile;
@@ -461,7 +434,7 @@ queue_add_from_mft(int fd, struct entity
* that the repository has already been loaded.
*/
- entityq_add(fd, q, nfile, type, NULL, file->hash, NULL, 0, NULL, eid);
+ entityq_add(fd, q, nfile, type, NULL, file->hash, NULL, 0, NULL);
}
/*
@@ -473,8 +446,7 @@ queue_add_from_mft(int fd, struct entity
* check the suffix anyway).
*/
static void
-queue_add_from_mft_set(int fd, struct entityq *q, const struct mft *mft,
- size_t *eid)
+queue_add_from_mft_set(int fd, struct entityq *q, const struct mft *mft)
{
size_t i, sz;
const struct mftfile *f;
@@ -485,7 +457,7 @@ queue_add_from_mft_set(int fd, struct en
assert(sz > 4);
if (strcasecmp(f->file + sz - 4, ".crl"))
continue;
- queue_add_from_mft(fd, q, mft->file, f, RTYPE_CRL, eid);
+ queue_add_from_mft(fd, q, mft->file, f, RTYPE_CRL);
}
for (i = 0; i < mft->filesz; i++) {
@@ -494,7 +466,7 @@ queue_add_from_mft_set(int fd, struct en
assert(sz > 4);
if (strcasecmp(f->file + sz - 4, ".cer"))
continue;
- queue_add_from_mft(fd, q, mft->file, f, RTYPE_CER, eid);
+ queue_add_from_mft(fd, q, mft->file, f, RTYPE_CER);
}
for (i = 0; i < mft->filesz; i++) {
@@ -503,7 +475,7 @@ queue_add_from_mft_set(int fd, struct en
assert(sz > 4);
if (strcasecmp(f->file + sz - 4, ".roa"))
continue;
- queue_add_from_mft(fd, q, mft->file, f, RTYPE_ROA, eid);
+ queue_add_from_mft(fd, q, mft->file, f, RTYPE_ROA);
}
for (i = 0; i < mft->filesz; i++) {
@@ -512,7 +484,7 @@ queue_add_from_mft_set(int fd, struct en
assert(sz > 4);
if (strcasecmp(f->file + sz - 4, ".gbr"))
continue;
- queue_add_from_mft(fd, q, mft->file, f, RTYPE_GBR, eid);
+ queue_add_from_mft(fd, q, mft->file, f, RTYPE_GBR);
}
for (i = 0; i < mft->filesz; i++) {
@@ -532,7 +504,7 @@ queue_add_from_mft_set(int fd, struct en
* Add a local TAL file (RFC 7730) to the queue of files to fetch.
*/
static void
-queue_add_tal(int fd, struct entityq *q, const char *file, size_t *eid)
+queue_add_tal(int fd, struct entityq *q, const char *file)
{
char *nfile, *buf;
@@ -552,7 +524,7 @@ queue_add_tal(int fd, struct entityq *q,
}
/* Not in a repository, so directly add to queue. */
- entityq_add(fd, q, nfile, RTYPE_TAL, NULL, NULL, NULL, 0, buf, eid);
+ entityq_add(fd, q, nfile, RTYPE_TAL, NULL, NULL, NULL, 0, buf);
/* entityq_add makes a copy of buf */
free(buf);
}
@@ -562,7 +534,7 @@ queue_add_tal(int fd, struct entityq *q,
*/
static void
queue_add_from_tal(int proc, int rsync, struct entityq *q,
- const struct tal *tal, size_t *eid)
+ const struct tal *tal)
{
char *nfile;
const struct repo *repo;
@@ -584,7 +556,7 @@ queue_add_from_tal(int proc, int rsync,
nfile = repo_filename(repo, uri);
entityq_add(proc, q, nfile, RTYPE_CER, repo, NULL, tal->pkey,
- tal->pkeysz, tal->descr, eid);
+ tal->pkeysz, tal->descr);
}
/*
@@ -592,7 +564,7 @@ queue_add_from_tal(int proc, int rsync,
*/
static void
queue_add_from_cert(int proc, int rsync, struct entityq *q,
- const char *rsyncuri, const char *rrdpuri, size_t *eid)
+ const char *rsyncuri, const char *rrdpuri)
{
char *nfile;
const struct repo *repo;
@@ -604,7 +576,7 @@ queue_add_from_cert(int proc, int rsync,
repo = repo_lookup(rsync, rsyncuri);
nfile = repo_filename(repo, rsyncuri);
- entityq_add(proc, q, nfile, RTYPE_MFT, repo, NULL, NULL, 0, NULL, eid);
+ entityq_add(proc, q, nfile, RTYPE_MFT, repo, NULL, NULL, 0, NULL);
}
/*
@@ -1133,7 +1105,8 @@ proc_parser(int fd)
entp = TAILQ_FIRST(&q);
assert(entp != NULL);
- entity_buffer_resp(&b, &bsz, &bmax, entp);
+ io_simple_buffer(&b, &bsz, &bmax, &entp->type,
+ sizeof(entp->type));
switch (entp->type) {
case RTYPE_TAL:
@@ -1216,9 +1189,9 @@ out:
*/
static void
entity_process(int proc, int rsync, struct stats *st,
- struct entityq *q, const struct entity *ent,
- size_t *eid, struct vrp_tree *tree)
+ struct entityq *q, struct vrp_tree *tree)
{
+ enum rtype type;
struct tal *tal;
struct cert *cert;
struct mft *mft;
@@ -1231,12 +1204,13 @@ entity_process(int proc, int rsync, stru
* certificate, for example).
* We follow that up with whether the resources didn't parse.
*/
+ io_simple_read(proc, &type, sizeof(type));
- switch (ent->type) {
+ switch (type) {
case RTYPE_TAL:
st->tals++;
tal = tal_read(proc);
- queue_add_from_tal(proc, rsync, q, tal, eid);
+ queue_add_from_tal(proc, rsync, q, tal);
tal_free(tal);
break;
case RTYPE_CER:
@@ -1255,7 +1229,7 @@ entity_process(int proc, int rsync, stru
* process the MFT.
*/
queue_add_from_cert(proc, rsync,
- q, cert->mft, cert->notify, eid);
+ q, cert->mft, cert->notify);
} else
st->certs_invalid++;
cert_free(cert);
@@ -1270,7 +1244,7 @@ entity_process(int proc, int rsync, stru
mft = mft_read(proc);
if (mft->stale)
st->mfts_stale++;
- queue_add_from_mft_set(proc, q, mft, eid);
+ queue_add_from_mft_set(proc, q, mft);
mft_free(mft);
break;
case RTYPE_CRL:
@@ -1296,6 +1270,8 @@ entity_process(int proc, int rsync, stru
default:
abort();
}
+
+ entity_queue--;
}
/*
@@ -1425,11 +1401,10 @@ main(int argc, char *argv[])
{
int rc = 1, c, proc, st, rsync,
fl = SOCK_STREAM | SOCK_CLOEXEC;
- size_t i, j, eid = 1, outsz = 0, talsz = 0;
+ size_t i, j, outsz = 0, talsz = 0;
pid_t procpid, rsyncpid;
int fd[2];
struct entityq q;
- struct entity *ent;
struct pollfd pfd[2];
struct roa **out = NULL;
char *rsync_prog = "openrsync";
@@ -1616,7 +1591,7 @@ main(int argc, char *argv[])
*/
for (i = 0; i < talsz; i++)
- queue_add_tal(proc, &q, tals[i], &eid);
+ queue_add_tal(proc, &q, tals[i]);
/*
* The main process drives the top-down scan to leaf ROAs using
@@ -1628,7 +1603,7 @@ main(int argc, char *argv[])
pfd[1].fd = proc;
pfd[0].events = pfd[1].events = POLLIN;
- while (!TAILQ_EMPTY(&q) && !killme) {
+ while (entity_queue > 0 && !killme) {
if ((c = poll(pfd, 2, verbose ? 10000 : INFTIM)) == -1) {
if (errno == EINTR)
continue;
@@ -1642,10 +1617,7 @@ main(int argc, char *argv[])
if (!rt.repos[i].loaded)
j++;
logx("period stats: %zu pending repos", j);
- j = 0;
- TAILQ_FOREACH(ent, &q, entries)
- j++;
- logx("period stats: %zu pending entries", j);
+ logx("period stats: %zu pending entries", entity_queue);
continue;
}
@@ -1687,12 +1659,7 @@ main(int argc, char *argv[])
*/
if ((pfd[1].revents & POLLIN)) {
- ent = entityq_next(proc, &q);
- entity_process(proc, rsync, &stats,
- &q, ent, &eid, &v);
- if (verbose > 2)
- fprintf(stderr, "%s\n", ent->uri);
- entity_free(ent);
+ entity_process(proc, rsync, &stats, &q, &v);
}
}