Currently each worker thread is stateless. This patch allows us to bind some per-worker fields to each worker thread, which could be reused accross different tasks.
One of the examples is the compressor and buffer used in multi-threaded compression. Signed-off-by: Yifan Zhao <[email protected]> --- include/erofs/workqueue.h | 10 +++++++--- lib/workqueue.c | 19 ++++++++++++++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h index d0ebc5a..a11a8fc 100644 --- a/include/erofs/workqueue.h +++ b/include/erofs/workqueue.h @@ -13,11 +13,13 @@ struct erofs_work; typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq, struct erofs_work *work); +typedef void erofs_wq_priv_fini_t(void *); struct erofs_work { struct erofs_workqueue *queue; struct erofs_work *next; erofs_workqueue_func_t *function; + void *private; }; struct erofs_workqueue { @@ -32,12 +34,14 @@ struct erofs_workqueue { bool terminated; int max_queued; pthread_cond_t queue_full; + size_t private_size; + erofs_wq_priv_fini_t *private_fini; }; -int erofs_workqueue_create(struct erofs_workqueue *wq, +int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size, + erofs_wq_priv_fini_t *private_fini, unsigned int nr_workers, unsigned int max_queue); -int erofs_workqueue_add(struct erofs_workqueue *wq, - struct erofs_work *wi); +int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *wi); int erofs_workqueue_terminate(struct erofs_workqueue *wq); void erofs_workqueue_destroy(struct erofs_workqueue *wq); diff --git a/lib/workqueue.c b/lib/workqueue.c index 6573821..01d12d9 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -18,6 +18,12 @@ static void *workqueue_thread(void *arg) { struct erofs_workqueue *wq = arg; struct erofs_work *wi; + void *private = NULL; + + if (wq->private_size) { + private = calloc(1, wq->private_size); + assert(private); + } /* * Loop pulling work from the passed in work queue. @@ -56,13 +62,22 @@ static void *workqueue_thread(void *arg) } pthread_mutex_unlock(&wq->lock); + wi->private = private; (wi->function)(wq, wi); } + + if (private) { + assert(wq->private_fini); + (wq->private_fini)(private); + free(private); + } + return NULL; } /* Allocate a work queue and threads. Returns zero or negative error code. */ -int erofs_workqueue_create(struct erofs_workqueue *wq, +int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size, + erofs_wq_priv_fini_t *priv_fini, unsigned int nr_workers, unsigned int max_queue) { unsigned int i; @@ -79,6 +94,8 @@ int erofs_workqueue_create(struct erofs_workqueue *wq, if (err) goto out_cond; + wq->private_size = private_size; + wq->private_fini = priv_fini; wq->thread_count = nr_workers; wq->max_queued = max_queue; wq->threads = malloc(nr_workers * sizeof(pthread_t)); -- 2.43.0
