On Wed, Mar 10, 2021 at 10:12:51AM +0100, Claudio Jeker wrote: > The entity queue is per repository. It is a queue of files that depend on > this repository and need to wait until the repository finished its sync. > There is no benefit of a global queue. > > In my opinion this is more understandable.
Yes, this is much clearer. ok tb > -- > :wq Claudio > > Index: main.c > =================================================================== > RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v > retrieving revision 1.115 > diff -u -p -r1.115 main.c > --- main.c 10 Mar 2021 08:09:41 -0000 1.115 > +++ main.c 10 Mar 2021 08:20:00 -0000 > @@ -57,6 +57,7 @@ struct repo { > char *local; /* local path name */ > char *temp; /* temporary file / dir */ > char *uris[REPO_MAX_URI]; /* URIs to fetch from */ > + struct entityq queue; /* files waiting for this repo */ > size_t id; /* identifier (array index) */ > int uriidx; /* which URI is fetched */ > int loaded; /* whether loaded or not */ > @@ -208,15 +209,13 @@ entity_write_req(const struct entity *en > * repo, then flush those into the parser process. > */ > static void > -entityq_flush(struct entityq *q, const struct repo *repo) > +entityq_flush(struct repo *repo) > { > struct entity *p, *np; > > - TAILQ_FOREACH_SAFE(p, q, entries, np) { > - if (p->repo < 0 || repo->id != (size_t)p->repo) > - continue; > + TAILQ_FOREACH_SAFE(p, &repo->queue, entries, np) { > entity_write_req(p); > - TAILQ_REMOVE(q, p, entries); > + TAILQ_REMOVE(&repo->queue, p, entries); > entity_free(p); > } > } > @@ -225,9 +224,8 @@ entityq_flush(struct entityq *q, const s > * Add the heap-allocated file to the queue for processing. > */ > static void > -entityq_add(struct entityq *q, char *file, enum rtype type, > - const struct repo *rp, const unsigned char *pkey, size_t pkeysz, > - char *descr) > +entityq_add(char *file, enum rtype type, struct repo *rp, > + const unsigned char *pkey, size_t pkeysz, char *descr) > { > struct entity *p; > > @@ -261,7 +259,7 @@ entityq_add(struct entityq *q, char *fil > entity_write_req(p); > entity_free(p); > } else > - TAILQ_INSERT_TAIL(q, p, entries); > + TAILQ_INSERT_TAIL(&rp->queue, p, entries); > } > > /* > @@ -279,6 +277,7 @@ repo_alloc(void) > > rp = &rt.repos[rt.reposz++]; > rp->id = rt.reposz - 1; > + TAILQ_INIT(&rp->queue); > > return rp; > } > @@ -390,7 +389,7 @@ repo_fetch(struct repo *rp) > /* > * Look up a trust anchor, queueing it for download if not found. > */ > -static const struct repo * > +static struct repo * > ta_lookup(const struct tal *tal) > { > struct repo *rp; > @@ -424,7 +423,7 @@ ta_lookup(const struct tal *tal) > /* > * Look up a repository, queueing it for discovery if not found. > */ > -static const struct repo * > +static struct repo * > repo_lookup(const char *uri) > { > char *local, *repo; > @@ -494,8 +493,7 @@ repo_filename(const struct repo *repo, c > * These are always relative to the directory in which "mft" sits. > */ > static void > -queue_add_from_mft(struct entityq *q, const char *mft, > - const struct mftfile *file, enum rtype type) > +queue_add_from_mft(const char *mft, const struct mftfile *file, enum rtype > type) > { > char *cp, *nfile; > > @@ -511,7 +509,7 @@ queue_add_from_mft(struct entityq *q, co > * that the repository has already been loaded. > */ > > - entityq_add(q, nfile, type, NULL, NULL, 0, NULL); > + entityq_add(nfile, type, NULL, NULL, 0, NULL); > } > > /* > @@ -523,7 +521,7 @@ queue_add_from_mft(struct entityq *q, co > * check the suffix anyway). > */ > static void > -queue_add_from_mft_set(struct entityq *q, const struct mft *mft) > +queue_add_from_mft_set(const struct mft *mft) > { > size_t i, sz; > const struct mftfile *f; > @@ -534,7 +532,7 @@ queue_add_from_mft_set(struct entityq *q > assert(sz > 4); > if (strcasecmp(f->file + sz - 4, ".crl") != 0) > continue; > - queue_add_from_mft(q, mft->file, f, RTYPE_CRL); > + queue_add_from_mft(mft->file, f, RTYPE_CRL); > } > > for (i = 0; i < mft->filesz; i++) { > @@ -544,11 +542,11 @@ queue_add_from_mft_set(struct entityq *q > if (strcasecmp(f->file + sz - 4, ".crl") == 0) > continue; > else if (strcasecmp(f->file + sz - 4, ".cer") == 0) > - queue_add_from_mft(q, mft->file, f, RTYPE_CER); > + queue_add_from_mft(mft->file, f, RTYPE_CER); > else if (strcasecmp(f->file + sz - 4, ".roa") == 0) > - queue_add_from_mft(q, mft->file, f, RTYPE_ROA); > + queue_add_from_mft(mft->file, f, RTYPE_ROA); > else if (strcasecmp(f->file + sz - 4, ".gbr") == 0) > - queue_add_from_mft(q, mft->file, f, RTYPE_GBR); > + queue_add_from_mft(mft->file, f, RTYPE_GBR); > else > logx("%s: unsupported file type: %s", mft->file, > f->file); > @@ -559,7 +557,7 @@ queue_add_from_mft_set(struct entityq *q > * Add a local TAL file (RFC 7730) to the queue of files to fetch. > */ > static void > -queue_add_tal(struct entityq *q, const char *file) > +queue_add_tal(const char *file) > { > char *nfile, *buf; > > @@ -580,7 +578,7 @@ queue_add_tal(struct entityq *q, const c > } > > /* Not in a repository, so directly add to queue. */ > - entityq_add(q, nfile, RTYPE_TAL, NULL, NULL, 0, buf); > + entityq_add(nfile, RTYPE_TAL, NULL, NULL, 0, buf); > /* entityq_add makes a copy of buf */ > free(buf); > } > @@ -589,10 +587,10 @@ queue_add_tal(struct entityq *q, const c > * Add URIs (CER) from a TAL file, RFC 8630. > */ > static void > -queue_add_from_tal(struct entityq *q, const struct tal *tal) > +queue_add_from_tal(const struct tal *tal) > { > - char *nfile; > - const struct repo *repo; > + char *nfile; > + struct repo *repo; > > assert(tal->urisz); > > @@ -600,7 +598,7 @@ queue_add_from_tal(struct entityq *q, co > repo = ta_lookup(tal); > > nfile = ta_filename(repo, 0); > - entityq_add(q, nfile, RTYPE_CER, repo, tal->pkey, > + entityq_add(nfile, RTYPE_CER, repo, tal->pkey, > tal->pkeysz, tal->descr); > } > > @@ -608,10 +606,10 @@ queue_add_from_tal(struct entityq *q, co > * Add a manifest (MFT) found in an X509 certificate, RFC 6487. > */ > static void > -queue_add_from_cert(struct entityq *q, const struct cert *cert) > +queue_add_from_cert(const struct cert *cert) > { > - const struct repo *repo; > - char *nfile; > + struct repo *repo; > + char *nfile; > > repo = repo_lookup(cert->mft); > if (repo == NULL) /* bad repository URI */ > @@ -619,7 +617,7 @@ queue_add_from_cert(struct entityq *q, c > > nfile = repo_filename(repo, cert->mft); > > - entityq_add(q, nfile, RTYPE_MFT, repo, NULL, 0, NULL); > + entityq_add(nfile, RTYPE_MFT, repo, NULL, 0, NULL); > } > > /* > @@ -629,8 +627,7 @@ queue_add_from_cert(struct entityq *q, c > * In all cases, we gather statistics. > */ > static void > -entity_process(int proc, struct stats *st, struct entityq *q, > - struct vrp_tree *tree) > +entity_process(int proc, struct stats *st, struct vrp_tree *tree) > { > enum rtype type; > struct tal *tal; > @@ -651,7 +648,7 @@ entity_process(int proc, struct stats *s > case RTYPE_TAL: > st->tals++; > tal = tal_read(proc); > - queue_add_from_tal(q, tal); > + queue_add_from_tal(tal); > tal_free(tal); > break; > case RTYPE_CER: > @@ -669,7 +666,7 @@ entity_process(int proc, struct stats *s > * we're revoked and then we don't want to > * process the MFT. > */ > - queue_add_from_cert(q, cert); > + queue_add_from_cert(cert); > } else > st->certs_invalid++; > cert_free(cert); > @@ -684,7 +681,7 @@ entity_process(int proc, struct stats *s > mft = mft_read(proc); > if (mft->stale) > st->mfts_stale++; > - queue_add_from_mft_set(q, mft); > + queue_add_from_mft_set(mft); > mft_free(mft); > break; > case RTYPE_CRL: > @@ -838,7 +835,6 @@ main(int argc, char *argv[]) > size_t i, outsz = 0, talsz = 0; > pid_t procpid, rsyncpid, httppid; > int fd[2]; > - struct entityq q; > struct pollfd pfd[3]; > struct roa **out = NULL; > char *rsync_prog = "openrsync"; > @@ -953,8 +949,6 @@ main(int argc, char *argv[]) > if (talsz == 0) > err(1, "no TAL files found in %s", "/etc/rpki"); > > - TAILQ_INIT(&q); > - > /* change working directory to the cache directory */ > if (fchdir(cachefd) == -1) > err(1, "fchdir"); > @@ -1074,7 +1068,7 @@ main(int argc, char *argv[]) > */ > > for (i = 0; i < talsz; i++) > - queue_add_tal(&q, tals[i]); > + queue_add_tal(tals[i]); > > while (entity_queue > 0 && !killme) { > pfd[0].events = POLLIN; > @@ -1150,7 +1144,7 @@ main(int argc, char *argv[]) > "fallback to cache", rt.repos[i].local); > rt.repos[i].loaded = 1; > stats.repos++; > - entityq_flush(&q, &rt.repos[i]); > + entityq_flush(&rt.repos[i]); > } > > if ((pfd[2].revents & POLLIN)) { > @@ -1163,7 +1157,7 @@ main(int argc, char *argv[]) > if (http_done(&rt.repos[i], ok)) { > rt.repos[i].loaded = 1; > stats.repos++; > - entityq_flush(&q, &rt.repos[i]); > + entityq_flush(&rt.repos[i]); > } > } > > @@ -1173,7 +1167,7 @@ main(int argc, char *argv[]) > */ > > if ((pfd[1].revents & POLLIN)) { > - entity_process(proc, &stats, &q, &v); > + entity_process(proc, &stats, &v); > } > } > > @@ -1183,7 +1177,7 @@ main(int argc, char *argv[]) > errx(1, "excessive runtime (%d seconds), giving up", timeout); > } > > - assert(TAILQ_EMPTY(&q)); > + assert(entity_queue == 0); > logx("all files parsed: generating output"); > rc = 0; > >