Change log since v1: - Re-implement workqueue API instead of using xfsprogs' workqueue - Use per-worker tmpfile for multi-threaded mkfs
Gao Xiang (1): erofs-utils: add a helper to get available processors Yifan Zhao (6): erofs-utils: introduce multi-threading framework erofs-utils: mkfs: add --worker=# parameter erofs-utils: mkfs: optionally print warning in erofs_compressor_init erofs-utils: mkfs: introduce inner-file multi-threaded compression erofs-utils: mkfs: introduce inter-file multi-threaded compression erofs-utils: mkfs: use per-worker tmpfile for multi-threaded mkfs configure.ac | 17 + include/erofs/compress.h | 18 + include/erofs/config.h | 5 + include/erofs/internal.h | 6 + include/erofs/list.h | 8 + include/erofs/queue.h | 22 + include/erofs/workqueue.h | 38 ++ lib/Makefile.am | 4 + lib/compress.c | 836 ++++++++++++++++++++++++++++++------ lib/compressor.c | 7 +- lib/compressor.h | 5 +- lib/compressor_deflate.c | 4 +- lib/compressor_libdeflate.c | 4 +- lib/compressor_liblzma.c | 5 +- lib/compressor_lz4.c | 2 +- lib/compressor_lz4hc.c | 2 +- lib/config.c | 16 + lib/inode.c | 302 ++++++++++--- lib/queue.c | 64 +++ lib/workqueue.c | 132 ++++++ mkfs/main.c | 38 ++ 21 files changed, 1342 insertions(+), 193 deletions(-) create mode 100644 include/erofs/queue.h create mode 100644 include/erofs/workqueue.h create mode 100644 lib/queue.c create mode 100644 lib/workqueue.c Interdiff against v1: diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h index a11a8fc..857947b 100644 --- a/include/erofs/workqueue.h +++ b/include/erofs/workqueue.h @@ -1,48 +1,38 @@ -/* SPDX-License-Identifier: GPL-2.0+ */ -/* - * Copyright (C) 2017 Oracle. All Rights Reserved. - * Author: Darrick J. Wong <[email protected]> - */ +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */ #ifndef __EROFS_WORKQUEUE_H #define __EROFS_WORKQUEUE_H #include "internal.h" -struct erofs_workqueue; struct erofs_work; -typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq, - struct erofs_work *work); +typedef void erofs_wq_func_t(struct erofs_work *); typedef void erofs_wq_priv_fini_t(void *); struct erofs_work { - struct erofs_workqueue *queue; + void (*func)(struct erofs_work *work); struct erofs_work *next; - erofs_workqueue_func_t *function; - void *private; + void *priv; }; struct erofs_workqueue { - pthread_t *threads; - struct erofs_work *next_item; - struct erofs_work *last_item; + struct erofs_work *head; + struct erofs_work *tail; pthread_mutex_t lock; - pthread_cond_t wakeup; - unsigned int item_count; - unsigned int thread_count; - bool terminate; - bool terminated; - int max_queued; - pthread_cond_t queue_full; - size_t private_size; - erofs_wq_priv_fini_t *private_fini; + pthread_cond_t cond_empty; + pthread_cond_t cond_full; + pthread_t *workers; + unsigned int nworker; + unsigned int max_jobs; + unsigned int job_count; + bool shutdown; + size_t priv_size; + erofs_wq_priv_fini_t *priv_fini; }; -int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size, - erofs_wq_priv_fini_t *private_fini, - unsigned int nr_workers, unsigned int max_queue); -int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *wi); -int erofs_workqueue_terminate(struct erofs_workqueue *wq); -void erofs_workqueue_destroy(struct erofs_workqueue *wq); - +int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, size_t priv_size, + erofs_wq_priv_fini_t *priv_fini); +int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work); +int erofs_workqueue_shutdown(struct erofs_workqueue *wq); #endif \ No newline at end of file diff --git a/lib/compress.c b/lib/compress.c index d6c59b0..3fae260 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -8,6 +8,9 @@ #ifndef _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE #endif +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif #include <string.h> #include <stdlib.h> #include <unistd.h> @@ -23,6 +26,13 @@ #ifdef EROFS_MT_ENABLED #include "erofs/workqueue.h" #endif +#ifdef HAVE_LINUX_FALLOC_H +#include <linux/falloc.h> +#endif + +#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE) +#define USE_PER_WORKER_TMPFILE 1 +#endif /* compressing configuration specified by users */ struct erofs_compress_cfg { @@ -59,6 +69,7 @@ struct z_erofs_vle_compress_ctx { int seg_num, seg_idx; FILE *tmpfile; + off_t tmpfile_off; }; struct z_erofs_write_index_ctx { @@ -75,6 +86,7 @@ struct erofs_compress_wq_private { u8 *queue; char *destbuf; struct erofs_compress_cfg *ccfg; + FILE* tmpfile; }; struct erofs_compress_work { @@ -402,6 +414,7 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx, ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile); if (ret != 1) return -EIO; + fflush(ctx->tmpfile); } else { erofs_dbg("Writing %u uncompressed data to block %u", count, ctx->blkaddr); @@ -1073,6 +1086,7 @@ void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx, ctx->tof_chksum = tof_chksum; ctx->fd = fd; ctx->tmpfile = NULL; + ctx->tmpfile_off = 0; init_list_head(&ctx->extents); } @@ -1169,7 +1183,7 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi, struct erofs_compress_cfg *lc; int ret; - if (!priv->init) { + if (unlikely(!priv->init)) { priv->init = true; priv->queue = malloc(EROFS_COMPR_QUEUE_SZ); @@ -1185,6 +1199,16 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi, sizeof(struct erofs_compress_cfg)); if (!priv->ccfg) return -ENOMEM; + +#ifdef USE_PER_WORKER_TMPFILE +#ifndef HAVE_TMPFILE64 + priv->tmpfile = tmpfile(); +#else + priv->tmpfile = tmpfile64(); +#endif + if (!priv->tmpfile) + return -errno; +#endif } lc = &priv->ccfg[alg_id]; @@ -1214,15 +1238,18 @@ void z_erofs_mt_private_fini(void *private) free(priv->ccfg); free(priv->destbuf); free(priv->queue); +#ifdef USE_PER_WORKER_TMPFILE + fclose(priv->tmpfile); +#endif priv->init = false; } } -void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work) +void z_erofs_mt_work(struct erofs_work *work) { struct erofs_compress_work *cwork = (struct erofs_compress_work *)work; struct z_erofs_vle_compress_ctx *ctx = &cwork->ctx; - struct erofs_compress_wq_private *priv = work->private; + struct erofs_compress_wq_private *priv = work->priv; struct erofs_compress_file *cfile = cwork->file; erofs_blk_t blkaddr = ctx->blkaddr; u64 offset = ctx->seg_idx * cfg.c_mt_segment_size; @@ -1237,7 +1264,14 @@ void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work) ctx->queue = priv->queue; ctx->destbuf = priv->destbuf; ctx->chandle = &priv->ccfg[cwork->alg_id].handle; - +#ifdef USE_PER_WORKER_TMPFILE + ctx->tmpfile = priv->tmpfile; + ctx->tmpfile_off = ftell(ctx->tmpfile); + if (ctx->tmpfile_off == -1) { + ret = -errno; + goto out; + } +#else #ifdef HAVE_TMPFILE64 ctx->tmpfile = tmpfile64(); #else @@ -1247,13 +1281,13 @@ void z_erofs_mt_work(struct erofs_workqueue *wq, struct erofs_work *work) ret = -errno; goto out; } + ctx->tmpfile_off = 0; +#endif ret = z_erofs_compress_file(ctx, offset, blkaddr); if (ret) goto out; - fflush(ctx->tmpfile); - if (ctx->seg_idx == ctx->seg_num - 1) ret = z_erofs_handle_fragments(ctx); @@ -1273,6 +1307,7 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr, struct erofs_sb_info *sbi = cur->ctx.inode->sbi; struct z_erofs_write_index_ctx *ictx = cfile->ictx; char *memblock = NULL; + size_t size = 0; int ret = 0, lret; while (cur != NULL) { @@ -1289,28 +1324,32 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr, goto out; } - memblock = realloc(memblock, - ctx->compressed_blocks * erofs_blksiz(sbi)); + size = ctx->compressed_blocks * erofs_blksiz(sbi); + memblock = realloc(memblock, size); if (!memblock) { if (!ret) ret = -ENOMEM; goto out; } - lret = fseek(ctx->tmpfile, 0, SEEK_SET); - if (lret) { + lret = pread(fileno(ctx->tmpfile), memblock, size, + ctx->tmpfile_off); + if (lret != size) { if (!ret) - ret = lret; + ret = -errno; goto out; } - lret = fread(memblock, erofs_blksiz(sbi), - ctx->compressed_blocks, ctx->tmpfile); - if (lret != ctx->compressed_blocks) { +#ifdef USE_PER_WORKER_TMPFILE + lret = fallocate(fileno(ctx->tmpfile), + FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + ctx->tmpfile_off, size); + if (lret) { if (!ret) - ret = -EIO; + ret = -errno; goto out; } +#endif lret = blk_write(sbi, memblock, blkaddr + *compressed_blocks, ctx->compressed_blocks); @@ -1322,8 +1361,9 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr, *compressed_blocks += ctx->compressed_blocks; out: - if (ctx->tmpfile) +#ifndef USE_PER_WORKER_TMPFILE fclose(ctx->tmpfile); +#endif tmp = cur->next; pthread_mutex_lock(&work_mutex); @@ -1405,7 +1445,7 @@ struct erofs_compress_file *z_erofs_mt_do_compress( work->dict_size = ccfg->handle.dict_size; work->file = cfile; - work->work.function = z_erofs_mt_work; + work->work.func = z_erofs_mt_work; erofs_workqueue_add(&wq, &work->work); } @@ -1749,10 +1789,10 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s if (cfg.c_mt_worker_num == 1) { mt_enabled = false; } else { - ret = erofs_workqueue_create( - &wq, sizeof(struct erofs_compress_wq_private), - z_erofs_mt_private_fini, cfg.c_mt_worker_num, - cfg.c_mt_worker_num << 2); + ret = erofs_workqueue_init( + &wq, cfg.c_mt_worker_num, cfg.c_mt_worker_num << 2, + sizeof(struct erofs_compress_wq_private), + z_erofs_mt_private_fini); mt_enabled = !ret; } #else @@ -1777,10 +1817,9 @@ int z_erofs_compress_exit(void) if (mt_enabled) { #ifdef EROFS_MT_ENABLED - ret = erofs_workqueue_terminate(&wq); + ret = erofs_workqueue_shutdown(&wq); if (ret) return ret; - erofs_workqueue_destroy(&wq); while (work_idle) { struct erofs_compress_work *tmp = work_idle->next; free(work_idle); diff --git a/lib/workqueue.c b/lib/workqueue.c index 01d12d9..3ec6142 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -1,222 +1,132 @@ -// SPDX-License-Identifier: GPL-2.0+ -/* - * Copyright (C) 2017 Oracle. All Rights Reserved. - * Author: Darrick J. Wong <[email protected]> - */ +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 #include <pthread.h> -#include <signal.h> #include <stdlib.h> -#include <string.h> -#include <stdint.h> -#include <stdbool.h> -#include <errno.h> -#include <assert.h> #include "erofs/workqueue.h" -/* Main processing thread */ -static void *workqueue_thread(void *arg) +static void *worker_thread(void *arg) { struct erofs_workqueue *wq = arg; - struct erofs_work *wi; - void *private = NULL; + struct erofs_work *work; + void *priv = NULL; - if (wq->private_size) { - private = calloc(1, wq->private_size); - assert(private); + if (wq->priv_size) { + priv = calloc(wq->priv_size, 1); + assert(priv); } - /* - * Loop pulling work from the passed in work queue. - * Check for notification to exit after every chunk of work. - */ - while (1) { + while (true) { pthread_mutex_lock(&wq->lock); - /* - * Wait for work. - */ - while (wq->next_item == NULL && !wq->terminate) { - assert(wq->item_count == 0); - pthread_cond_wait(&wq->wakeup, &wq->lock); - } - if (wq->next_item == NULL && wq->terminate) { + while (wq->job_count == 0 && !wq->shutdown) + pthread_cond_wait(&wq->cond_empty, &wq->lock); + if (wq->job_count == 0 && wq->shutdown) { pthread_mutex_unlock(&wq->lock); break; } - /* - * Dequeue work from the head of the list. If the queue was - * full then send a wakeup if we're configured to do so. - */ - assert(wq->item_count > 0); - if (wq->max_queued) - pthread_cond_broadcast(&wq->queue_full); - - wi = wq->next_item; - wq->next_item = wi->next; - wq->item_count--; - - if (wq->max_queued && wq->next_item) { - /* more work, wake up another worker */ - pthread_cond_signal(&wq->wakeup); - } + work = wq->head; + wq->head = work->next; + if (!wq->head) + wq->tail = NULL; + wq->job_count--; + + if (wq->job_count == wq->max_jobs - 1) + pthread_cond_broadcast(&wq->cond_full); + pthread_mutex_unlock(&wq->lock); - wi->private = private; - (wi->function)(wq, wi); + work->priv = priv; + work->func(work); } - if (private) { - assert(wq->private_fini); - (wq->private_fini)(private); - free(private); + if (priv) { + assert(wq->priv_fini); + (wq->priv_fini)(priv); + free(priv); } return NULL; } -/* Allocate a work queue and threads. Returns zero or negative error code. */ -int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size, - erofs_wq_priv_fini_t *priv_fini, - unsigned int nr_workers, unsigned int max_queue) +int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, size_t priv_size, + erofs_wq_priv_fini_t *priv_fini) { unsigned int i; - int err = 0; - - memset(wq, 0, sizeof(*wq)); - err = -pthread_cond_init(&wq->wakeup, NULL); - if (err) - return err; - err = -pthread_cond_init(&wq->queue_full, NULL); - if (err) - goto out_wake; - err = -pthread_mutex_init(&wq->lock, NULL); - if (err) - goto out_cond; - - wq->private_size = private_size; - wq->private_fini = priv_fini; - wq->thread_count = nr_workers; - wq->max_queued = max_queue; - wq->threads = malloc(nr_workers * sizeof(pthread_t)); - if (!wq->threads) { - err = -errno; - goto out_mutex; - } - wq->terminate = false; - wq->terminated = false; - for (i = 0; i < nr_workers; i++) { - err = -pthread_create(&wq->threads[i], NULL, workqueue_thread, - wq); - if (err) - break; - } + if (!wq || nworker <= 0 || max_jobs <= 0) + return -EINVAL; - /* - * If we encounter errors here, we have to signal and then wait for all - * the threads that may have been started running before we can destroy - * the workqueue. - */ - if (err) - erofs_workqueue_destroy(wq); - return err; -out_mutex: - pthread_mutex_destroy(&wq->lock); -out_cond: - pthread_cond_destroy(&wq->queue_full); -out_wake: - pthread_cond_destroy(&wq->wakeup); - return err; -} + wq->head = wq->tail = NULL; + wq->nworker = nworker; + wq->max_jobs = max_jobs; + wq->job_count = 0; + wq->shutdown = false; + wq->priv_size = priv_size; + wq->priv_fini = priv_fini; + pthread_mutex_init(&wq->lock, NULL); + pthread_cond_init(&wq->cond_empty, NULL); + pthread_cond_init(&wq->cond_full, NULL); -/* - * Create a work item consisting of a function and some arguments and schedule - * the work item to be run via the thread pool. Returns zero or a negative - * error code. - */ -int erofs_workqueue_add(struct erofs_workqueue *wq, - struct erofs_work *wi) -{ - int ret; + wq->workers = malloc(nworker * sizeof(pthread_t)); + if (!wq->workers) + return -ENOMEM; - assert(!wq->terminated); + for (i = 0; i < nworker; i++) { + if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) { + while (i--) + pthread_cancel(wq->workers[i]); + free(wq->workers); + return -ENOMEM; + } + } - if (wq->thread_count == 0) { - (wi->function)(wq, wi); return 0; } - wi->queue = wq; - wi->next = NULL; +int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work) +{ + if (!wq || !work) + return -EINVAL; - /* Now queue the new work structure to the work queue. */ pthread_mutex_lock(&wq->lock); -restart: - if (wq->next_item == NULL) { - assert(wq->item_count == 0); - ret = -pthread_cond_signal(&wq->wakeup); - if (ret) { - pthread_mutex_unlock(&wq->lock); - return ret; - } - wq->next_item = wi; - } else { - /* throttle on a full queue if configured */ - if (wq->max_queued && wq->item_count == wq->max_queued) { - pthread_cond_wait(&wq->queue_full, &wq->lock); - /* - * Queue might be empty or even still full by the time - * we get the lock back, so restart the lookup so we do - * the right thing with the current state of the queue. - */ - goto restart; - } - wq->last_item->next = wi; - } - wq->last_item = wi; - wq->item_count++; + + while (wq->job_count == wq->max_jobs) + pthread_cond_wait(&wq->cond_full, &wq->lock); + + work->next = NULL; + if (!wq->head) + wq->head = work; + else + wq->tail->next = work; + wq->tail = work; + wq->job_count++; + + pthread_cond_signal(&wq->cond_empty); pthread_mutex_unlock(&wq->lock); + return 0; } -/* - * Wait for all pending work items to be processed and tear down the - * workqueue thread pool. Returns zero or a negative error code. - */ -int erofs_workqueue_terminate(struct erofs_workqueue *wq) +int erofs_workqueue_shutdown(struct erofs_workqueue *wq) { unsigned int i; - int ret; - - pthread_mutex_lock(&wq->lock); - wq->terminate = true; - pthread_mutex_unlock(&wq->lock); - ret = -pthread_cond_broadcast(&wq->wakeup); - if (ret) - return ret; - - for (i = 0; i < wq->thread_count; i++) { - ret = -pthread_join(wq->threads[i], NULL); - if (ret) - return ret; - } + if (!wq) + return -EINVAL; pthread_mutex_lock(&wq->lock); - wq->terminated = true; + wq->shutdown = true; + pthread_cond_broadcast(&wq->cond_empty); pthread_mutex_unlock(&wq->lock); - return 0; -} -/* Tear down the workqueue. */ -void erofs_workqueue_destroy(struct erofs_workqueue *wq) -{ - assert(wq->terminated); + for (i = 0; i < wq->nworker; i++) + pthread_join(wq->workers[i], NULL); - free(wq->threads); + free(wq->workers); pthread_mutex_destroy(&wq->lock); - pthread_cond_destroy(&wq->wakeup); - pthread_cond_destroy(&wq->queue_full); - memset(wq, 0, sizeof(*wq)); + pthread_cond_destroy(&wq->cond_empty); + pthread_cond_destroy(&wq->cond_full); + + return 0; } \ No newline at end of file -- 2.43.2
