Finish plumbing up everything we will need to process multiple client requests in parallel after handshake is complete. Since status is now global, and properly protected by a mutex, all of the threads will eventually quit as soon as any of them notices EOF or nbdkit detects a signal.
For ease of review, the framework for configuring threads is done separately from the low-level work of utilizing the threads, so this patch sees no behavior change (because we hard-code conn->nworkers to 0); although it's a one-line hack to test that a larger nworkers still behaves the same even for a non-parallel plugin (in fact, such a hack was how I found and squashed several thread-safety bugs in the previous patches, exposed from running test-socket-activation in a loop). Signed-off-by: Eric Blake <ebl...@redhat.com> --- src/connections.c | 91 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/src/connections.c b/src/connections.c index 9d95e7f..41371fb 100644 --- a/src/connections.c +++ b/src/connections.c @@ -68,7 +68,7 @@ struct connection { int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */ void *handle; void *crypto_session; - int nworkers; /* TODO set up a thread pool for parallel workers */ + int nworkers; uint64_t exportsize; int readonly; @@ -83,7 +83,8 @@ struct connection { connection_close_function close; }; -static struct connection *new_connection (int sockin, int sockout); +static struct connection *new_connection (int sockin, int sockout, + int nworkers); static void free_connection (struct connection *conn); static int negotiate_handshake (struct connection *conn); static int recv_request_send_reply (struct connection *conn); @@ -175,12 +176,39 @@ set_status (struct connection *conn, int value) return value; } +struct worker_data { + struct connection *conn; + char *name; +}; + +static void * +connection_worker (void *data) +{ + struct worker_data *worker = data; + struct connection *conn = worker->conn; + char *name = worker->name; + + debug ("starting worker thread %s", name); + threadlocal_new_server_thread (); + threadlocal_set_name (name); + free (worker); + + while (!quit && get_status (conn) > 0) + recv_request_send_reply (conn); + debug ("exiting worker thread %s", threadlocal_get_name ()); + free (name); + return NULL; +} + static int _handle_single_connection (int sockin, int sockout) { int r = -1; - struct connection *conn = new_connection (sockin, sockout); + struct connection *conn; + int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line override */ + pthread_t *workers = NULL; + conn = new_connection (sockin, sockout, nworkers); if (!conn) goto done; @@ -193,11 +221,55 @@ _handle_single_connection (int sockin, int sockout) if (negotiate_handshake (conn) == -1) goto done; - /* Process requests. XXX Allow these to be dispatched in parallel using - * a thread pool. - */ - while (!quit && get_status (conn) > 0) - recv_request_send_reply (conn); + if (nworkers <= 1) { + /* No need for a separate thread. */ + debug ("handshake complete, processing requests serially"); + conn->nworkers = 0; + while (!quit && get_status (conn) > 0) + recv_request_send_reply (conn); + } + else { + /* Create thread pool to process requests. */ + debug ("handshake complete, processing requests with %d threads", + nworkers); + workers = calloc (nworkers, sizeof *workers); + if (!workers) { + perror ("malloc"); + goto done; + } + + for (nworkers = 0; nworkers < conn->nworkers; nworkers++) { + struct worker_data *worker = malloc (sizeof *worker); + int err; + + if (!worker) { + perror ("malloc"); + set_status (conn, -1); + goto wait; + } + if (asprintf (&worker->name, "%s.%d", plugin_name (), nworkers) < 0) { + perror ("asprintf"); + set_status (conn, -1); + free (worker); + goto wait; + } + worker->conn = conn; + err = pthread_create (&workers[nworkers], NULL, connection_worker, + worker); + if (err) { + errno = err; + perror ("pthread_create"); + set_status (conn, -1); + free (worker); + goto wait; + } + } + + wait: + while (nworkers) + pthread_join (workers[--nworkers], NULL); + free (workers); + } r = get_status (conn); done: @@ -218,7 +290,7 @@ handle_single_connection (int sockin, int sockout) } static struct connection * -new_connection (int sockin, int sockout) +new_connection (int sockin, int sockout, int nworkers) { struct connection *conn; @@ -229,6 +301,7 @@ new_connection (int sockin, int sockout) } conn->status = 1; + conn->nworkers = nworkers; conn->sockin = sockin; conn->sockout = sockout; pthread_mutex_init (&conn->request_lock, NULL); -- 2.13.6 _______________________________________________ Libguestfs mailing list Libguestfs@redhat.com https://www.redhat.com/mailman/listinfo/libguestfs