Currently each worker thread is stateless. This patch allows us to bind
some per-worker fields to each worker thread, which could be reused
accross different tasks.

One of the examples is the compressor and buffer used in multi-threaded
compression.

Signed-off-by: Yifan Zhao <[email protected]>
---
 include/erofs/workqueue.h | 10 +++++++---
 lib/workqueue.c           | 19 ++++++++++++++++++-
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
index d0ebc5a..a11a8fc 100644
--- a/include/erofs/workqueue.h
+++ b/include/erofs/workqueue.h
@@ -13,11 +13,13 @@ struct erofs_work;
 
 typedef void erofs_workqueue_func_t(struct erofs_workqueue *wq,
                                    struct erofs_work *work);
+typedef void erofs_wq_priv_fini_t(void *);
 
 struct erofs_work {
        struct erofs_workqueue  *queue;
        struct erofs_work       *next;
        erofs_workqueue_func_t  *function;
+       void                    *private;
 };
 
 struct erofs_workqueue {
@@ -32,12 +34,14 @@ struct erofs_workqueue {
        bool                    terminated;
        int                     max_queued;
        pthread_cond_t          queue_full;
+       size_t                  private_size;
+       erofs_wq_priv_fini_t    *private_fini;
 };
 
-int erofs_workqueue_create(struct erofs_workqueue *wq,
+int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
+                          erofs_wq_priv_fini_t *private_fini,
                           unsigned int nr_workers, unsigned int max_queue);
-int erofs_workqueue_add(struct erofs_workqueue *wq,
-                       struct erofs_work *wi);
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *wi);
 int erofs_workqueue_terminate(struct erofs_workqueue *wq);
 void erofs_workqueue_destroy(struct erofs_workqueue *wq);
 
diff --git a/lib/workqueue.c b/lib/workqueue.c
index 6573821..01d12d9 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -18,6 +18,12 @@ static void *workqueue_thread(void *arg)
 {
        struct erofs_workqueue          *wq = arg;
        struct erofs_work               *wi;
+       void                            *private = NULL;
+
+       if (wq->private_size) {
+               private = calloc(1, wq->private_size);
+               assert(private);
+       }
 
        /*
         * Loop pulling work from the passed in work queue.
@@ -56,13 +62,22 @@ static void *workqueue_thread(void *arg)
                }
                pthread_mutex_unlock(&wq->lock);
 
+               wi->private = private;
                (wi->function)(wq, wi);
        }
+
+       if (private) {
+               assert(wq->private_fini);
+               (wq->private_fini)(private);
+               free(private);
+       }
+
        return NULL;
 }
 
 /* Allocate a work queue and threads.  Returns zero or negative error code. */
-int erofs_workqueue_create(struct erofs_workqueue *wq,
+int erofs_workqueue_create(struct erofs_workqueue *wq, size_t private_size,
+                          erofs_wq_priv_fini_t *priv_fini,
                           unsigned int nr_workers, unsigned int max_queue)
 {
        unsigned int            i;
@@ -79,6 +94,8 @@ int erofs_workqueue_create(struct erofs_workqueue *wq,
        if (err)
                goto out_cond;
 
+       wq->private_size = private_size;
+       wq->private_fini = priv_fini;
        wq->thread_count = nr_workers;
        wq->max_queued = max_queue;
        wq->threads = malloc(nr_workers * sizeof(pthread_t));
-- 
2.43.0

Reply via email to