On 2024/2/20 15:55, Yifan Zhao wrote:
Add a workqueue implementation for multi-threading support inspired by
xfsprogs.

Signed-off-by: Yifan Zhao <zhaoyi...@sjtu.edu.cn>
Suggested-by: Gao Xiang <hsiang...@linux.alibaba.com>
---
  configure.ac              |  16 +++++
  include/erofs/internal.h  |   3 +
  include/erofs/workqueue.h |  38 +++++++++++
  lib/Makefile.am           |   4 ++
  lib/workqueue.c           | 132 ++++++++++++++++++++++++++++++++++++++
  5 files changed, 193 insertions(+)
  create mode 100644 include/erofs/workqueue.h
  create mode 100644 lib/workqueue.c

diff --git a/configure.ac b/configure.ac
index bf6e99f..53306c3 100644
--- a/configure.ac
+++ b/configure.ac
@@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports]) +AC_MSG_CHECKING([whether to enable multi-threading support])
+AC_ARG_ENABLE([multithreading],
+    AS_HELP_STRING([--enable-multithreading],
+                   [enable multi-threading support @<:@default=no@:>@]),
+    [enable_multithreading="$enableval"],
+    [enable_multithreading="no"])
+AC_MSG_RESULT([$enable_multithreading])
+
  AC_ARG_ENABLE([debug],
      [AS_HELP_STRING([--enable-debug],
                      [enable debugging mode @<:@default=no@:>@])],
@@ -288,6 +296,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
                               [erofs_cv_max_block_size=4096]))
  ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
+# Configure multi-threading support
+AS_IF([test "x$enable_multithreading" != "xno"], [
+    AC_CHECK_HEADERS([pthread.h])
+    AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread 
is required for multi-threaded build]))
+    AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
+], [])
+
  # Configure debug mode
  AS_IF([test "x$enable_debug" != "xno"], [], [
    dnl Turn off all assert checking.
@@ -467,6 +482,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
  AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
# Set up needed symbols, conditionals and compiler/linker flags
+AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
  AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
  AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
  AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index 82797e1..954aef4 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -22,6 +22,9 @@ typedef unsigned short umode_t;
  #include <sys/types.h> /* for off_t definition */
  #include <sys/stat.h> /* for S_ISCHR definition */
  #include <stdio.h>
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
#ifndef PATH_MAX
  #define PATH_MAX        4096    /* # chars in a path name including nul */
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
new file mode 100644
index 0000000..857947b
--- /dev/null
+++ b/include/erofs/workqueue.h
@@ -0,0 +1,38 @@
+/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
+#ifndef __EROFS_WORKQUEUE_H
+#define __EROFS_WORKQUEUE_H
+
+#include "internal.h"
+
+struct erofs_work;
+
+typedef void erofs_wq_func_t(struct erofs_work *);
+typedef void erofs_wq_priv_fini_t(void *);
+
+struct erofs_work {
+       void (*func)(struct erofs_work *work);
+       struct erofs_work *next;
+       void *priv;
+};
+
+struct erofs_workqueue {
+       struct erofs_work *head;
+       struct erofs_work *tail;

I'd suggest
struct erofs_work *head, *tail;

since they seem the same functionality.

+       pthread_mutex_t lock;
+       pthread_cond_t cond_empty;
+       pthread_cond_t cond_full;
+       pthread_t *workers;
+       unsigned int nworker;
+       unsigned int max_jobs;
+       unsigned int job_count;
+       bool shutdown;
+       size_t priv_size;
+       erofs_wq_priv_fini_t *priv_fini;
+};
+
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+                        unsigned int max_jobs, size_t priv_size,
+                        erofs_wq_priv_fini_t *priv_fini);
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work);
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq);
+#endif
\ No newline at end of file
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 54b9c9c..7307f7b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
  if ENABLE_LIBDEFLATE
  liberofs_la_SOURCES += compressor_libdeflate.c
  endif
+if ENABLE_EROFS_MT
+liberofs_la_CFLAGS += -lpthread
+liberofs_la_SOURCES += workqueue.c
+endif
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..3ec6142
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,132 @@
+// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
+#include <pthread.h>
+#include <stdlib.h>
+#include "erofs/workqueue.h"
+
+static void *worker_thread(void *arg)
+{
+       struct erofs_workqueue *wq = arg;
+       struct erofs_work *work;
+       void *priv = NULL;
+
+       if (wq->priv_size) {
+               priv = calloc(wq->priv_size, 1);
+               assert(priv);
+       }
+
+       while (true) {
+               pthread_mutex_lock(&wq->lock);
+
+               while (wq->job_count == 0 && !wq->shutdown)
+                       pthread_cond_wait(&wq->cond_empty, &wq->lock);
+               if (wq->job_count == 0 && wq->shutdown) {
+                       pthread_mutex_unlock(&wq->lock);
+                       break;
+               }
+
+               work = wq->head;
+               wq->head = work->next;
+               if (!wq->head)
+                       wq->tail = NULL;
+               wq->job_count--;
+
+               if (wq->job_count == wq->max_jobs - 1)
+                       pthread_cond_broadcast(&wq->cond_full);
+
+               pthread_mutex_unlock(&wq->lock);
+
+               work->priv = priv;
+               work->func(work);
+       }
+
+       if (priv) {
+               assert(wq->priv_fini);
+               (wq->priv_fini)(priv);
+               free(priv);
+       }
+
+       return NULL;
+}
+
+int erofs_workqueue_init(struct erofs_workqueue *wq, unsigned int nworker,
+                        unsigned int max_jobs, size_t priv_size,
+                        erofs_wq_priv_fini_t *priv_fini)

Let's match kernel workqueue naming...

erofs_alloc_workqueue...

+{
+       unsigned int i;
+
+       if (!wq || nworker <= 0 || max_jobs <= 0)
+               return -EINVAL;
+
+       wq->head = wq->tail = NULL;
+       wq->nworker = nworker;
+       wq->max_jobs = max_jobs;
+       wq->job_count = 0;
+       wq->shutdown = false;
+       wq->priv_size = priv_size;
+       wq->priv_fini = priv_fini;
+       pthread_mutex_init(&wq->lock, NULL);
+       pthread_cond_init(&wq->cond_empty, NULL);
+       pthread_cond_init(&wq->cond_full, NULL);
+
+       wq->workers = malloc(nworker * sizeof(pthread_t));
+       if (!wq->workers)
+               return -ENOMEM;
+
+       for (i = 0; i < nworker; i++) {
+               if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
+                       while (i--)
+                               pthread_cancel(wq->workers[i]);
+                       free(wq->workers);
+                       return -ENOMEM;
+               }
+       }
+
+       return 0;
+}
+
+int erofs_workqueue_add(struct erofs_workqueue *wq, struct erofs_work *work)

erofs_queue_work


+{
+       if (!wq || !work)
+               return -EINVAL;
+
+       pthread_mutex_lock(&wq->lock);
+
+       while (wq->job_count == wq->max_jobs)
+               pthread_cond_wait(&wq->cond_full, &wq->lock);
+
+       work->next = NULL;
+       if (!wq->head)
+               wq->head = work;
+       else
+               wq->tail->next = work;
+       wq->tail = work;
+       wq->job_count++;
+
+       pthread_cond_signal(&wq->cond_empty);
+       pthread_mutex_unlock(&wq->lock);
+
+       return 0;
+}
+
+int erofs_workqueue_shutdown(struct erofs_workqueue *wq)

erofs_destroy_workqueue

Thanks,
Gao Xiang

+{
+       unsigned int i;
+
+       if (!wq)
+               return -EINVAL;
+
+       pthread_mutex_lock(&wq->lock);
+       wq->shutdown = true;
+       pthread_cond_broadcast(&wq->cond_empty);
+       pthread_mutex_unlock(&wq->lock);
+
+       for (i = 0; i < wq->nworker; i++)
+               pthread_join(wq->workers[i], NULL);
+
+       free(wq->workers);
+       pthread_mutex_destroy(&wq->lock);
+       pthread_cond_destroy(&wq->cond_empty);
+       pthread_cond_destroy(&wq->cond_full);
+
+       return 0;
+}
\ No newline at end of file

Reply via email to