Currently rpki-client keeps all pending work on a queue and only removes it from the queue at once it got processed. The only bit that the parent rpki-client process needs from the queue is the type when processing the response. So instead of passing the id pass the type back from the parser.
With this the queue only holds entries that can't be processed right now because the repository is not yet loaded. Additionally the handling of responses becomes more decoupled. All in all I think this simplifies the code a fair bit. What do others think? -- :wq Claudio Index: main.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v retrieving revision 1.88 diff -u -p -r1.88 main.c --- main.c 21 Dec 2020 11:35:55 -0000 1.88 +++ main.c 7 Jan 2021 13:22:02 -0000 @@ -88,6 +88,7 @@ struct repo { size_t id; /* identifier (array index) */ }; +size_t entity_queue; int timeout = 60*60; volatile sig_atomic_t killme; void suicide(int sig); @@ -105,7 +106,6 @@ static struct repotab { * and parsed. */ struct entity { - size_t id; /* unique identifier */ enum rtype type; /* type of entity (not RTYPE_EOF) */ char *uri; /* file or rsync:// URI */ int has_dgst; /* whether dgst is specified */ @@ -223,7 +223,6 @@ static void entity_read_req(int fd, struct entity *ent) { - io_simple_read(fd, &ent->id, sizeof(size_t)); io_simple_read(fd, &ent->type, sizeof(enum rtype)); io_str_read(fd, &ent->uri); io_simple_read(fd, &ent->has_dgst, sizeof(int)); @@ -244,7 +243,6 @@ entity_buffer_req(char **b, size_t *bsz, const struct entity *ent) { - io_simple_buffer(b, bsz, bmax, &ent->id, sizeof(size_t)); io_simple_buffer(b, bsz, bmax, &ent->type, sizeof(enum rtype)); io_str_buffer(b, bsz, bmax, ent->uri); io_simple_buffer(b, bsz, bmax, &ent->has_dgst, sizeof(int)); @@ -278,12 +276,14 @@ entity_write_req(int fd, const struct en static void entityq_flush(int fd, struct entityq *q, const struct repo *repo) { - struct entity *p; + struct entity *p, *np; - TAILQ_FOREACH(p, q, entries) { + TAILQ_FOREACH_SAFE(p, q, entries, np) { if (p->repo < 0 || repo->id != (size_t)p->repo) continue; entity_write_req(fd, p); + TAILQ_REMOVE(q, p, entries); + entity_free(p); } } @@ -365,49 +365,18 @@ repo_filename(const struct repo *repo, c } /* - * Read the next entity from the parser process, removing it from the - * queue of pending requests in the process. - * This always returns a valid entity. - */ -static struct entity * -entityq_next(int fd, struct entityq *q) -{ - size_t id; - struct entity *entp; - - io_simple_read(fd, &id, sizeof(size_t)); - - TAILQ_FOREACH(entp, q, entries) - if (entp->id == id) - break; - - assert(entp != NULL); - TAILQ_REMOVE(q, entp, entries); - return entp; -} - -static void -entity_buffer_resp(char **b, size_t *bsz, size_t *bmax, - const struct entity *ent) -{ - - io_simple_buffer(b, bsz, bmax, &ent->id, sizeof(size_t)); -} - -/* * Add the heap-allocated file to the queue for processing. */ static void entityq_add(int fd, struct entityq *q, char *file, enum rtype type, const struct repo *rp, const unsigned char *dgst, - const unsigned char *pkey, size_t pkeysz, char *descr, size_t *eid) + const unsigned char *pkey, size_t pkeysz, char *descr) { struct entity *p; if ((p = calloc(1, sizeof(struct entity))) == NULL) err(1, "calloc"); - p->id = (*eid)++; p->type = type; p->uri = file; p->repo = (rp != NULL) ? (ssize_t)rp->id : -1; @@ -426,15 +395,19 @@ entityq_add(int fd, struct entityq *q, c err(1, "strdup"); filepath_add(file); - TAILQ_INSERT_TAIL(q, p, entries); + + entity_queue++; /* * Write to the queue if there's no repo or the repo has already - * been loaded. + * been loaded else enqueue it for later. */ - if (rp == NULL || rp->loaded) + if (rp == NULL || rp->loaded) { entity_write_req(fd, p); + entity_free(p); + } else + TAILQ_INSERT_TAIL(q, p, entries); } /* @@ -443,7 +416,7 @@ entityq_add(int fd, struct entityq *q, c */ static void queue_add_from_mft(int fd, struct entityq *q, const char *mft, - const struct mftfile *file, enum rtype type, size_t *eid) + const struct mftfile *file, enum rtype type) { char *cp, *nfile; @@ -461,7 +434,7 @@ queue_add_from_mft(int fd, struct entity * that the repository has already been loaded. */ - entityq_add(fd, q, nfile, type, NULL, file->hash, NULL, 0, NULL, eid); + entityq_add(fd, q, nfile, type, NULL, file->hash, NULL, 0, NULL); } /* @@ -473,8 +446,7 @@ queue_add_from_mft(int fd, struct entity * check the suffix anyway). */ static void -queue_add_from_mft_set(int fd, struct entityq *q, const struct mft *mft, - size_t *eid) +queue_add_from_mft_set(int fd, struct entityq *q, const struct mft *mft) { size_t i, sz; const struct mftfile *f; @@ -485,7 +457,7 @@ queue_add_from_mft_set(int fd, struct en assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".crl")) continue; - queue_add_from_mft(fd, q, mft->file, f, RTYPE_CRL, eid); + queue_add_from_mft(fd, q, mft->file, f, RTYPE_CRL); } for (i = 0; i < mft->filesz; i++) { @@ -494,7 +466,7 @@ queue_add_from_mft_set(int fd, struct en assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".cer")) continue; - queue_add_from_mft(fd, q, mft->file, f, RTYPE_CER, eid); + queue_add_from_mft(fd, q, mft->file, f, RTYPE_CER); } for (i = 0; i < mft->filesz; i++) { @@ -503,7 +475,7 @@ queue_add_from_mft_set(int fd, struct en assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".roa")) continue; - queue_add_from_mft(fd, q, mft->file, f, RTYPE_ROA, eid); + queue_add_from_mft(fd, q, mft->file, f, RTYPE_ROA); } for (i = 0; i < mft->filesz; i++) { @@ -512,7 +484,7 @@ queue_add_from_mft_set(int fd, struct en assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".gbr")) continue; - queue_add_from_mft(fd, q, mft->file, f, RTYPE_GBR, eid); + queue_add_from_mft(fd, q, mft->file, f, RTYPE_GBR); } for (i = 0; i < mft->filesz; i++) { @@ -532,7 +504,7 @@ queue_add_from_mft_set(int fd, struct en * Add a local TAL file (RFC 7730) to the queue of files to fetch. */ static void -queue_add_tal(int fd, struct entityq *q, const char *file, size_t *eid) +queue_add_tal(int fd, struct entityq *q, const char *file) { char *nfile, *buf; @@ -552,7 +524,7 @@ queue_add_tal(int fd, struct entityq *q, } /* Not in a repository, so directly add to queue. */ - entityq_add(fd, q, nfile, RTYPE_TAL, NULL, NULL, NULL, 0, buf, eid); + entityq_add(fd, q, nfile, RTYPE_TAL, NULL, NULL, NULL, 0, buf); /* entityq_add makes a copy of buf */ free(buf); } @@ -562,7 +534,7 @@ queue_add_tal(int fd, struct entityq *q, */ static void queue_add_from_tal(int proc, int rsync, struct entityq *q, - const struct tal *tal, size_t *eid) + const struct tal *tal) { char *nfile; const struct repo *repo; @@ -584,7 +556,7 @@ queue_add_from_tal(int proc, int rsync, nfile = repo_filename(repo, uri); entityq_add(proc, q, nfile, RTYPE_CER, repo, NULL, tal->pkey, - tal->pkeysz, tal->descr, eid); + tal->pkeysz, tal->descr); } /* @@ -592,7 +564,7 @@ queue_add_from_tal(int proc, int rsync, */ static void queue_add_from_cert(int proc, int rsync, struct entityq *q, - const char *rsyncuri, const char *rrdpuri, size_t *eid) + const char *rsyncuri, const char *rrdpuri) { char *nfile; const struct repo *repo; @@ -604,7 +576,7 @@ queue_add_from_cert(int proc, int rsync, repo = repo_lookup(rsync, rsyncuri); nfile = repo_filename(repo, rsyncuri); - entityq_add(proc, q, nfile, RTYPE_MFT, repo, NULL, NULL, 0, NULL, eid); + entityq_add(proc, q, nfile, RTYPE_MFT, repo, NULL, NULL, 0, NULL); } /* @@ -1133,7 +1105,8 @@ proc_parser(int fd) entp = TAILQ_FIRST(&q); assert(entp != NULL); - entity_buffer_resp(&b, &bsz, &bmax, entp); + io_simple_buffer(&b, &bsz, &bmax, &entp->type, + sizeof(entp->type)); switch (entp->type) { case RTYPE_TAL: @@ -1216,9 +1189,9 @@ out: */ static void entity_process(int proc, int rsync, struct stats *st, - struct entityq *q, const struct entity *ent, - size_t *eid, struct vrp_tree *tree) + struct entityq *q, struct vrp_tree *tree) { + enum rtype type; struct tal *tal; struct cert *cert; struct mft *mft; @@ -1231,12 +1204,13 @@ entity_process(int proc, int rsync, stru * certificate, for example). * We follow that up with whether the resources didn't parse. */ + io_simple_read(proc, &type, sizeof(type)); - switch (ent->type) { + switch (type) { case RTYPE_TAL: st->tals++; tal = tal_read(proc); - queue_add_from_tal(proc, rsync, q, tal, eid); + queue_add_from_tal(proc, rsync, q, tal); tal_free(tal); break; case RTYPE_CER: @@ -1255,7 +1229,7 @@ entity_process(int proc, int rsync, stru * process the MFT. */ queue_add_from_cert(proc, rsync, - q, cert->mft, cert->notify, eid); + q, cert->mft, cert->notify); } else st->certs_invalid++; cert_free(cert); @@ -1270,7 +1244,7 @@ entity_process(int proc, int rsync, stru mft = mft_read(proc); if (mft->stale) st->mfts_stale++; - queue_add_from_mft_set(proc, q, mft, eid); + queue_add_from_mft_set(proc, q, mft); mft_free(mft); break; case RTYPE_CRL: @@ -1296,6 +1270,8 @@ entity_process(int proc, int rsync, stru default: abort(); } + + entity_queue--; } /* @@ -1425,11 +1401,10 @@ main(int argc, char *argv[]) { int rc = 1, c, proc, st, rsync, fl = SOCK_STREAM | SOCK_CLOEXEC; - size_t i, j, eid = 1, outsz = 0, talsz = 0; + size_t i, j, outsz = 0, talsz = 0; pid_t procpid, rsyncpid; int fd[2]; struct entityq q; - struct entity *ent; struct pollfd pfd[2]; struct roa **out = NULL; char *rsync_prog = "openrsync"; @@ -1616,7 +1591,7 @@ main(int argc, char *argv[]) */ for (i = 0; i < talsz; i++) - queue_add_tal(proc, &q, tals[i], &eid); + queue_add_tal(proc, &q, tals[i]); /* * The main process drives the top-down scan to leaf ROAs using @@ -1628,7 +1603,7 @@ main(int argc, char *argv[]) pfd[1].fd = proc; pfd[0].events = pfd[1].events = POLLIN; - while (!TAILQ_EMPTY(&q) && !killme) { + while (entity_queue > 0 && !killme) { if ((c = poll(pfd, 2, verbose ? 10000 : INFTIM)) == -1) { if (errno == EINTR) continue; @@ -1642,10 +1617,7 @@ main(int argc, char *argv[]) if (!rt.repos[i].loaded) j++; logx("period stats: %zu pending repos", j); - j = 0; - TAILQ_FOREACH(ent, &q, entries) - j++; - logx("period stats: %zu pending entries", j); + logx("period stats: %zu pending entries", entity_queue); continue; } @@ -1687,12 +1659,7 @@ main(int argc, char *argv[]) */ if ((pfd[1].revents & POLLIN)) { - ent = entityq_next(proc, &q); - entity_process(proc, rsync, &stats, - &q, ent, &eid, &v); - if (verbose > 2) - fprintf(stderr, "%s\n", ent->uri); - entity_free(ent); + entity_process(proc, rsync, &stats, &q, &v); } }