Currently rpki-client keeps all pending work on a queue and only removes
it from the queue at once it got processed. The only bit that the parent
rpki-client process needs from the queue is the type when processing the
response. So instead of passing the id pass the type back from the parser.

With this the queue only holds entries that can't be processed right now
because the repository is not yet loaded. Additionally the handling of
responses becomes more decoupled.

All in all I think this simplifies the code a fair bit. What do others
think?
-- 
:wq Claudio

Index: main.c
===================================================================
RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v
retrieving revision 1.88
diff -u -p -r1.88 main.c
--- main.c      21 Dec 2020 11:35:55 -0000      1.88
+++ main.c      7 Jan 2021 13:22:02 -0000
@@ -88,6 +88,7 @@ struct        repo {
        size_t   id; /* identifier (array index) */
 };
 
+size_t entity_queue;
 int    timeout = 60*60;
 volatile sig_atomic_t killme;
 void   suicide(int sig);
@@ -105,7 +106,6 @@ static struct       repotab {
  * and parsed.
  */
 struct entity {
-       size_t           id; /* unique identifier */
        enum rtype       type; /* type of entity (not RTYPE_EOF) */
        char            *uri; /* file or rsync:// URI */
        int              has_dgst; /* whether dgst is specified */
@@ -223,7 +223,6 @@ static void
 entity_read_req(int fd, struct entity *ent)
 {
 
-       io_simple_read(fd, &ent->id, sizeof(size_t));
        io_simple_read(fd, &ent->type, sizeof(enum rtype));
        io_str_read(fd, &ent->uri);
        io_simple_read(fd, &ent->has_dgst, sizeof(int));
@@ -244,7 +243,6 @@ entity_buffer_req(char **b, size_t *bsz,
     const struct entity *ent)
 {
 
-       io_simple_buffer(b, bsz, bmax, &ent->id, sizeof(size_t));
        io_simple_buffer(b, bsz, bmax, &ent->type, sizeof(enum rtype));
        io_str_buffer(b, bsz, bmax, ent->uri);
        io_simple_buffer(b, bsz, bmax, &ent->has_dgst, sizeof(int));
@@ -278,12 +276,14 @@ entity_write_req(int fd, const struct en
 static void
 entityq_flush(int fd, struct entityq *q, const struct repo *repo)
 {
-       struct entity   *p;
+       struct entity   *p, *np;
 
-       TAILQ_FOREACH(p, q, entries) {
+       TAILQ_FOREACH_SAFE(p, q, entries, np) {
                if (p->repo < 0 || repo->id != (size_t)p->repo)
                        continue;
                entity_write_req(fd, p);
+               TAILQ_REMOVE(q, p, entries);
+               entity_free(p);
        }
 }
 
@@ -365,49 +365,18 @@ repo_filename(const struct repo *repo, c
 }
 
 /*
- * Read the next entity from the parser process, removing it from the
- * queue of pending requests in the process.
- * This always returns a valid entity.
- */
-static struct entity *
-entityq_next(int fd, struct entityq *q)
-{
-       size_t           id;
-       struct entity   *entp;
-
-       io_simple_read(fd, &id, sizeof(size_t));
-
-       TAILQ_FOREACH(entp, q, entries)
-               if (entp->id == id)
-                       break;
-
-       assert(entp != NULL);
-       TAILQ_REMOVE(q, entp, entries);
-       return entp;
-}
-
-static void
-entity_buffer_resp(char **b, size_t *bsz, size_t *bmax,
-    const struct entity *ent)
-{
-
-       io_simple_buffer(b, bsz, bmax, &ent->id, sizeof(size_t));
-}
-
-/*
  * Add the heap-allocated file to the queue for processing.
  */
 static void
 entityq_add(int fd, struct entityq *q, char *file, enum rtype type,
     const struct repo *rp, const unsigned char *dgst,
-    const unsigned char *pkey, size_t pkeysz, char *descr, size_t *eid)
+    const unsigned char *pkey, size_t pkeysz, char *descr)
 {
        struct entity   *p;
 
        if ((p = calloc(1, sizeof(struct entity))) == NULL)
                err(1, "calloc");
 
-       p->id = (*eid)++;
        p->type = type;
        p->uri = file;
        p->repo = (rp != NULL) ? (ssize_t)rp->id : -1;
@@ -426,15 +395,19 @@ entityq_add(int fd, struct entityq *q, c
                        err(1, "strdup");
 
        filepath_add(file);
-       TAILQ_INSERT_TAIL(q, p, entries);
+
+       entity_queue++;
 
        /*
         * Write to the queue if there's no repo or the repo has already
-        * been loaded.
+        * been loaded else enqueue it for later.
         */
 
-       if (rp == NULL || rp->loaded)
+       if (rp == NULL || rp->loaded) {
                entity_write_req(fd, p);
+               entity_free(p);
+       } else
+               TAILQ_INSERT_TAIL(q, p, entries);
 }
 
 /*
@@ -443,7 +416,7 @@ entityq_add(int fd, struct entityq *q, c
  */
 static void
 queue_add_from_mft(int fd, struct entityq *q, const char *mft,
-    const struct mftfile *file, enum rtype type, size_t *eid)
+    const struct mftfile *file, enum rtype type)
 {
        char            *cp, *nfile;
 
@@ -461,7 +434,7 @@ queue_add_from_mft(int fd, struct entity
         * that the repository has already been loaded.
         */
 
-       entityq_add(fd, q, nfile, type, NULL, file->hash, NULL, 0, NULL, eid);
+       entityq_add(fd, q, nfile, type, NULL, file->hash, NULL, 0, NULL);
 }
 
 /*
@@ -473,8 +446,7 @@ queue_add_from_mft(int fd, struct entity
  * check the suffix anyway).
  */
 static void
-queue_add_from_mft_set(int fd, struct entityq *q, const struct mft *mft,
-    size_t *eid)
+queue_add_from_mft_set(int fd, struct entityq *q, const struct mft *mft)
 {
        size_t                   i, sz;
        const struct mftfile    *f;
@@ -485,7 +457,7 @@ queue_add_from_mft_set(int fd, struct en
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".crl"))
                        continue;
-               queue_add_from_mft(fd, q, mft->file, f, RTYPE_CRL, eid);
+               queue_add_from_mft(fd, q, mft->file, f, RTYPE_CRL);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -494,7 +466,7 @@ queue_add_from_mft_set(int fd, struct en
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".cer"))
                        continue;
-               queue_add_from_mft(fd, q, mft->file, f, RTYPE_CER, eid);
+               queue_add_from_mft(fd, q, mft->file, f, RTYPE_CER);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -503,7 +475,7 @@ queue_add_from_mft_set(int fd, struct en
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".roa"))
                        continue;
-               queue_add_from_mft(fd, q, mft->file, f, RTYPE_ROA, eid);
+               queue_add_from_mft(fd, q, mft->file, f, RTYPE_ROA);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -512,7 +484,7 @@ queue_add_from_mft_set(int fd, struct en
                assert(sz > 4);
                if (strcasecmp(f->file + sz - 4, ".gbr"))
                        continue;
-               queue_add_from_mft(fd, q, mft->file, f, RTYPE_GBR, eid);
+               queue_add_from_mft(fd, q, mft->file, f, RTYPE_GBR);
        }
 
        for (i = 0; i < mft->filesz; i++) {
@@ -532,7 +504,7 @@ queue_add_from_mft_set(int fd, struct en
  * Add a local TAL file (RFC 7730) to the queue of files to fetch.
  */
 static void
-queue_add_tal(int fd, struct entityq *q, const char *file, size_t *eid)
+queue_add_tal(int fd, struct entityq *q, const char *file)
 {
        char    *nfile, *buf;
 
@@ -552,7 +524,7 @@ queue_add_tal(int fd, struct entityq *q,
        }
 
        /* Not in a repository, so directly add to queue. */
-       entityq_add(fd, q, nfile, RTYPE_TAL, NULL, NULL, NULL, 0, buf, eid);
+       entityq_add(fd, q, nfile, RTYPE_TAL, NULL, NULL, NULL, 0, buf);
        /* entityq_add makes a copy of buf */
        free(buf);
 }
@@ -562,7 +534,7 @@ queue_add_tal(int fd, struct entityq *q,
  */
 static void
 queue_add_from_tal(int proc, int rsync, struct entityq *q,
-    const struct tal *tal, size_t *eid)
+    const struct tal *tal)
 {
        char                    *nfile;
        const struct repo       *repo;
@@ -584,7 +556,7 @@ queue_add_from_tal(int proc, int rsync, 
        nfile = repo_filename(repo, uri);
 
        entityq_add(proc, q, nfile, RTYPE_CER, repo, NULL, tal->pkey,
-           tal->pkeysz, tal->descr, eid);
+           tal->pkeysz, tal->descr);
 }
 
 /*
@@ -592,7 +564,7 @@ queue_add_from_tal(int proc, int rsync, 
  */
 static void
 queue_add_from_cert(int proc, int rsync, struct entityq *q,
-    const char *rsyncuri, const char *rrdpuri, size_t *eid)
+    const char *rsyncuri, const char *rrdpuri)
 {
        char                    *nfile;
        const struct repo       *repo;
@@ -604,7 +576,7 @@ queue_add_from_cert(int proc, int rsync,
        repo = repo_lookup(rsync, rsyncuri);
        nfile = repo_filename(repo, rsyncuri);
 
-       entityq_add(proc, q, nfile, RTYPE_MFT, repo, NULL, NULL, 0, NULL, eid);
+       entityq_add(proc, q, nfile, RTYPE_MFT, repo, NULL, NULL, 0, NULL);
 }
 
 /*
@@ -1133,7 +1105,8 @@ proc_parser(int fd)
                entp = TAILQ_FIRST(&q);
                assert(entp != NULL);
 
-               entity_buffer_resp(&b, &bsz, &bmax, entp);
+               io_simple_buffer(&b, &bsz, &bmax, &entp->type,
+                   sizeof(entp->type));
 
                switch (entp->type) {
                case RTYPE_TAL:
@@ -1216,9 +1189,9 @@ out:
  */
 static void
 entity_process(int proc, int rsync, struct stats *st,
-    struct entityq *q, const struct entity *ent,
-    size_t *eid, struct vrp_tree *tree)
+    struct entityq *q, struct vrp_tree *tree)
 {
+       enum rtype      type;
        struct tal      *tal;
        struct cert     *cert;
        struct mft      *mft;
@@ -1231,12 +1204,13 @@ entity_process(int proc, int rsync, stru
         * certificate, for example).
         * We follow that up with whether the resources didn't parse.
         */
+       io_simple_read(proc, &type, sizeof(type));
 
-       switch (ent->type) {
+       switch (type) {
        case RTYPE_TAL:
                st->tals++;
                tal = tal_read(proc);
-               queue_add_from_tal(proc, rsync, q, tal, eid);
+               queue_add_from_tal(proc, rsync, q, tal);
                tal_free(tal);
                break;
        case RTYPE_CER:
@@ -1255,7 +1229,7 @@ entity_process(int proc, int rsync, stru
                         * process the MFT.
                         */
                        queue_add_from_cert(proc, rsync,
-                           q, cert->mft, cert->notify, eid);
+                           q, cert->mft, cert->notify);
                } else
                        st->certs_invalid++;
                cert_free(cert);
@@ -1270,7 +1244,7 @@ entity_process(int proc, int rsync, stru
                mft = mft_read(proc);
                if (mft->stale)
                        st->mfts_stale++;
-               queue_add_from_mft_set(proc, q, mft, eid);
+               queue_add_from_mft_set(proc, q, mft);
                mft_free(mft);
                break;
        case RTYPE_CRL:
@@ -1296,6 +1270,8 @@ entity_process(int proc, int rsync, stru
        default:
                abort();
        }
+
+       entity_queue--;
 }
 
 /*
@@ -1425,11 +1401,10 @@ main(int argc, char *argv[])
 {
        int              rc = 1, c, proc, st, rsync,
                         fl = SOCK_STREAM | SOCK_CLOEXEC;
-       size_t           i, j, eid = 1, outsz = 0, talsz = 0;
+       size_t           i, j, outsz = 0, talsz = 0;
        pid_t            procpid, rsyncpid;
        int              fd[2];
        struct entityq   q;
-       struct entity   *ent;
        struct pollfd    pfd[2];
        struct roa      **out = NULL;
        char            *rsync_prog = "openrsync";
@@ -1616,7 +1591,7 @@ main(int argc, char *argv[])
         */
 
        for (i = 0; i < talsz; i++)
-               queue_add_tal(proc, &q, tals[i], &eid);
+               queue_add_tal(proc, &q, tals[i]);
 
        /*
         * The main process drives the top-down scan to leaf ROAs using
@@ -1628,7 +1603,7 @@ main(int argc, char *argv[])
        pfd[1].fd = proc;
        pfd[0].events = pfd[1].events = POLLIN;
 
-       while (!TAILQ_EMPTY(&q) && !killme) {
+       while (entity_queue > 0 && !killme) {
                if ((c = poll(pfd, 2, verbose ? 10000 : INFTIM)) == -1) {
                        if (errno == EINTR)
                                continue;
@@ -1642,10 +1617,7 @@ main(int argc, char *argv[])
                                if (!rt.repos[i].loaded)
                                        j++;
                        logx("period stats: %zu pending repos", j);
-                       j = 0;
-                       TAILQ_FOREACH(ent, &q, entries)
-                               j++;
-                       logx("period stats: %zu pending entries", j);
+                       logx("period stats: %zu pending entries", entity_queue);
                        continue;
                }
 
@@ -1687,12 +1659,7 @@ main(int argc, char *argv[])
                 */
 
                if ((pfd[1].revents & POLLIN)) {
-                       ent = entityq_next(proc, &q);
-                       entity_process(proc, rsync, &stats,
-                           &q, ent, &eid, &v);
-                       if (verbose > 2)
-                               fprintf(stderr, "%s\n", ent->uri);
-                       entity_free(ent);
+                       entity_process(proc, rsync, &stats, &q, &v);
                }
        }
 

Reply via email to