On Wed, Mar 10, 2021 at 10:12:51AM +0100, Claudio Jeker wrote:
> The entity queue is per repository. It is a queue of files that depend on
> this repository and need to wait until the repository finished its sync.
> There is no benefit of a global queue.
> 
> In my opinion this is more understandable.

Yes, this is much clearer.

ok tb

> -- 
> :wq Claudio
> 
> Index: main.c
> ===================================================================
> RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v
> retrieving revision 1.115
> diff -u -p -r1.115 main.c
> --- main.c    10 Mar 2021 08:09:41 -0000      1.115
> +++ main.c    10 Mar 2021 08:20:00 -0000
> @@ -57,6 +57,7 @@ struct      repo {
>       char            *local;         /* local path name */
>       char            *temp;          /* temporary file / dir */
>       char            *uris[REPO_MAX_URI];    /* URIs to fetch from */
> +     struct entityq   queue;         /* files waiting for this repo */
>       size_t           id;            /* identifier (array index) */
>       int              uriidx;        /* which URI is fetched */
>       int              loaded;        /* whether loaded or not */
> @@ -208,15 +209,13 @@ entity_write_req(const struct entity *en
>   * repo, then flush those into the parser process.
>   */
>  static void
> -entityq_flush(struct entityq *q, const struct repo *repo)
> +entityq_flush(struct repo *repo)
>  {
>       struct entity   *p, *np;
>  
> -     TAILQ_FOREACH_SAFE(p, q, entries, np) {
> -             if (p->repo < 0 || repo->id != (size_t)p->repo)
> -                     continue;
> +     TAILQ_FOREACH_SAFE(p, &repo->queue, entries, np) {
>               entity_write_req(p);
> -             TAILQ_REMOVE(q, p, entries);
> +             TAILQ_REMOVE(&repo->queue, p, entries);
>               entity_free(p);
>       }
>  }
> @@ -225,9 +224,8 @@ entityq_flush(struct entityq *q, const s
>   * Add the heap-allocated file to the queue for processing.
>   */
>  static void
> -entityq_add(struct entityq *q, char *file, enum rtype type,
> -    const struct repo *rp, const unsigned char *pkey, size_t pkeysz,
> -    char *descr)
> +entityq_add(char *file, enum rtype type, struct repo *rp,
> +    const unsigned char *pkey, size_t pkeysz, char *descr)
>  {
>       struct entity   *p;
>  
> @@ -261,7 +259,7 @@ entityq_add(struct entityq *q, char *fil
>               entity_write_req(p);
>               entity_free(p);
>       } else
> -             TAILQ_INSERT_TAIL(q, p, entries);
> +             TAILQ_INSERT_TAIL(&rp->queue, p, entries);
>  }
>  
>  /*
> @@ -279,6 +277,7 @@ repo_alloc(void)
>  
>       rp = &rt.repos[rt.reposz++];
>       rp->id = rt.reposz - 1;
> +     TAILQ_INIT(&rp->queue);
>  
>       return rp;
>  }
> @@ -390,7 +389,7 @@ repo_fetch(struct repo *rp)
>  /*
>   * Look up a trust anchor, queueing it for download if not found.
>   */
> -static const struct repo *
> +static struct repo *
>  ta_lookup(const struct tal *tal)
>  {
>       struct repo     *rp;
> @@ -424,7 +423,7 @@ ta_lookup(const struct tal *tal)
>  /*
>   * Look up a repository, queueing it for discovery if not found.
>   */
> -static const struct repo *
> +static struct repo *
>  repo_lookup(const char *uri)
>  {
>       char            *local, *repo;
> @@ -494,8 +493,7 @@ repo_filename(const struct repo *repo, c
>   * These are always relative to the directory in which "mft" sits.
>   */
>  static void
> -queue_add_from_mft(struct entityq *q, const char *mft,
> -    const struct mftfile *file, enum rtype type)
> +queue_add_from_mft(const char *mft, const struct mftfile *file, enum rtype 
> type)
>  {
>       char            *cp, *nfile;
>  
> @@ -511,7 +509,7 @@ queue_add_from_mft(struct entityq *q, co
>        * that the repository has already been loaded.
>        */
>  
> -     entityq_add(q, nfile, type, NULL, NULL, 0, NULL);
> +     entityq_add(nfile, type, NULL, NULL, 0, NULL);
>  }
>  
>  /*
> @@ -523,7 +521,7 @@ queue_add_from_mft(struct entityq *q, co
>   * check the suffix anyway).
>   */
>  static void
> -queue_add_from_mft_set(struct entityq *q, const struct mft *mft)
> +queue_add_from_mft_set(const struct mft *mft)
>  {
>       size_t                   i, sz;
>       const struct mftfile    *f;
> @@ -534,7 +532,7 @@ queue_add_from_mft_set(struct entityq *q
>               assert(sz > 4);
>               if (strcasecmp(f->file + sz - 4, ".crl") != 0)
>                       continue;
> -             queue_add_from_mft(q, mft->file, f, RTYPE_CRL);
> +             queue_add_from_mft(mft->file, f, RTYPE_CRL);
>       }
>  
>       for (i = 0; i < mft->filesz; i++) {
> @@ -544,11 +542,11 @@ queue_add_from_mft_set(struct entityq *q
>               if (strcasecmp(f->file + sz - 4, ".crl") == 0)
>                       continue;
>               else if (strcasecmp(f->file + sz - 4, ".cer") == 0)
> -                     queue_add_from_mft(q, mft->file, f, RTYPE_CER);
> +                     queue_add_from_mft(mft->file, f, RTYPE_CER);
>               else if (strcasecmp(f->file + sz - 4, ".roa") == 0)
> -                     queue_add_from_mft(q, mft->file, f, RTYPE_ROA);
> +                     queue_add_from_mft(mft->file, f, RTYPE_ROA);
>               else if (strcasecmp(f->file + sz - 4, ".gbr") == 0)
> -                     queue_add_from_mft(q, mft->file, f, RTYPE_GBR);
> +                     queue_add_from_mft(mft->file, f, RTYPE_GBR);
>               else
>                       logx("%s: unsupported file type: %s", mft->file,
>                           f->file);
> @@ -559,7 +557,7 @@ queue_add_from_mft_set(struct entityq *q
>   * Add a local TAL file (RFC 7730) to the queue of files to fetch.
>   */
>  static void
> -queue_add_tal(struct entityq *q, const char *file)
> +queue_add_tal(const char *file)
>  {
>       char    *nfile, *buf;
>  
> @@ -580,7 +578,7 @@ queue_add_tal(struct entityq *q, const c
>       }
>  
>       /* Not in a repository, so directly add to queue. */
> -     entityq_add(q, nfile, RTYPE_TAL, NULL, NULL, 0, buf);
> +     entityq_add(nfile, RTYPE_TAL, NULL, NULL, 0, buf);
>       /* entityq_add makes a copy of buf */
>       free(buf);
>  }
> @@ -589,10 +587,10 @@ queue_add_tal(struct entityq *q, const c
>   * Add URIs (CER) from a TAL file, RFC 8630.
>   */
>  static void
> -queue_add_from_tal(struct entityq *q, const struct tal *tal)
> +queue_add_from_tal(const struct tal *tal)
>  {
> -     char                    *nfile;
> -     const struct repo       *repo;
> +     char            *nfile;
> +     struct repo     *repo;
>  
>       assert(tal->urisz);
>  
> @@ -600,7 +598,7 @@ queue_add_from_tal(struct entityq *q, co
>       repo = ta_lookup(tal);
>  
>       nfile = ta_filename(repo, 0);
> -     entityq_add(q, nfile, RTYPE_CER, repo, tal->pkey,
> +     entityq_add(nfile, RTYPE_CER, repo, tal->pkey,
>           tal->pkeysz, tal->descr);
>  }
>  
> @@ -608,10 +606,10 @@ queue_add_from_tal(struct entityq *q, co
>   * Add a manifest (MFT) found in an X509 certificate, RFC 6487.
>   */
>  static void
> -queue_add_from_cert(struct entityq *q, const struct cert *cert)
> +queue_add_from_cert(const struct cert *cert)
>  {
> -     const struct repo       *repo;
> -     char                    *nfile;
> +     struct repo     *repo;
> +     char            *nfile;
>  
>       repo = repo_lookup(cert->mft);
>       if (repo == NULL) /* bad repository URI */
> @@ -619,7 +617,7 @@ queue_add_from_cert(struct entityq *q, c
>  
>       nfile = repo_filename(repo, cert->mft);
>  
> -     entityq_add(q, nfile, RTYPE_MFT, repo, NULL, 0, NULL);
> +     entityq_add(nfile, RTYPE_MFT, repo, NULL, 0, NULL);
>  }
>  
>  /*
> @@ -629,8 +627,7 @@ queue_add_from_cert(struct entityq *q, c
>   * In all cases, we gather statistics.
>   */
>  static void
> -entity_process(int proc, struct stats *st, struct entityq *q,
> -    struct vrp_tree *tree)
> +entity_process(int proc, struct stats *st, struct vrp_tree *tree)
>  {
>       enum rtype      type;
>       struct tal      *tal;
> @@ -651,7 +648,7 @@ entity_process(int proc, struct stats *s
>       case RTYPE_TAL:
>               st->tals++;
>               tal = tal_read(proc);
> -             queue_add_from_tal(q, tal);
> +             queue_add_from_tal(tal);
>               tal_free(tal);
>               break;
>       case RTYPE_CER:
> @@ -669,7 +666,7 @@ entity_process(int proc, struct stats *s
>                        * we're revoked and then we don't want to
>                        * process the MFT.
>                        */
> -                     queue_add_from_cert(q, cert);
> +                     queue_add_from_cert(cert);
>               } else
>                       st->certs_invalid++;
>               cert_free(cert);
> @@ -684,7 +681,7 @@ entity_process(int proc, struct stats *s
>               mft = mft_read(proc);
>               if (mft->stale)
>                       st->mfts_stale++;
> -             queue_add_from_mft_set(q, mft);
> +             queue_add_from_mft_set(mft);
>               mft_free(mft);
>               break;
>       case RTYPE_CRL:
> @@ -838,7 +835,6 @@ main(int argc, char *argv[])
>       size_t           i, outsz = 0, talsz = 0;
>       pid_t            procpid, rsyncpid, httppid;
>       int              fd[2];
> -     struct entityq   q;
>       struct pollfd    pfd[3];
>       struct roa      **out = NULL;
>       char            *rsync_prog = "openrsync";
> @@ -953,8 +949,6 @@ main(int argc, char *argv[])
>       if (talsz == 0)
>               err(1, "no TAL files found in %s", "/etc/rpki");
>  
> -     TAILQ_INIT(&q);
> -
>       /* change working directory to the cache directory */
>       if (fchdir(cachefd) == -1)
>               err(1, "fchdir");
> @@ -1074,7 +1068,7 @@ main(int argc, char *argv[])
>        */
>  
>       for (i = 0; i < talsz; i++)
> -             queue_add_tal(&q, tals[i]);
> +             queue_add_tal(tals[i]);
>  
>       while (entity_queue > 0 && !killme) {
>               pfd[0].events = POLLIN;
> @@ -1150,7 +1144,7 @@ main(int argc, char *argv[])
>                                   "fallback to cache", rt.repos[i].local);
>                       rt.repos[i].loaded = 1;
>                       stats.repos++;
> -                     entityq_flush(&q, &rt.repos[i]);
> +                     entityq_flush(&rt.repos[i]);
>               }
>  
>               if ((pfd[2].revents & POLLIN)) {
> @@ -1163,7 +1157,7 @@ main(int argc, char *argv[])
>                       if (http_done(&rt.repos[i], ok)) {
>                               rt.repos[i].loaded = 1;
>                               stats.repos++;
> -                             entityq_flush(&q, &rt.repos[i]);
> +                             entityq_flush(&rt.repos[i]);
>                       }
>               }
>  
> @@ -1173,7 +1167,7 @@ main(int argc, char *argv[])
>                */
>  
>               if ((pfd[1].revents & POLLIN)) {
> -                     entity_process(proc, &stats, &q, &v);
> +                     entity_process(proc, &stats, &v);
>               }
>       }
>  
> @@ -1183,7 +1177,7 @@ main(int argc, char *argv[])
>               errx(1, "excessive runtime (%d seconds), giving up", timeout);
>       }
>  
> -     assert(TAILQ_EMPTY(&q));
> +     assert(entity_queue == 0);
>       logx("all files parsed: generating output");
>       rc = 0;
>  
> 

Reply via email to