I want to keep more info per worker, and using a worker struct is the natural way to do this. This also allows cleaning up the ops-* interface which accepted uintptr_t index while the index is never a pointer. I think the pointer is a result of passing the index to the thread using the void* pointer.
The worker struct is used only by the multi-threading-copy module, but in future patch I want to keep the worker pointer in the command, to allow commands to update worker state when they finish. Signed-off-by: Nir Soffer <nsof...@redhat.com> --- copy/file-ops.c | 4 +-- copy/main.c | 6 ++--- copy/multi-thread-copying.c | 49 +++++++++++++++++++------------------ copy/nbd-ops.c | 10 ++++---- copy/nbdcopy.h | 24 +++++++++++------- copy/null-ops.c | 4 +-- copy/pipe-ops.c | 2 +- 7 files changed, 53 insertions(+), 46 deletions(-) diff --git a/copy/file-ops.c b/copy/file-ops.c index aaf04ade..ab378754 100644 --- a/copy/file-ops.c +++ b/copy/file-ops.c @@ -614,27 +614,27 @@ file_asynch_zero (struct rw *rw, struct command *command, { int dummy = 0; if (!file_synch_zero (rw, command->offset, command->slice.len, allocate)) return false; cb.callback (cb.user_data, &dummy); return true; } static unsigned -file_in_flight (struct rw *rw, uintptr_t index) +file_in_flight (struct rw *rw, size_t index) { return 0; } static void -file_get_extents (struct rw *rw, uintptr_t index, +file_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { ret->len = 0; #ifdef SEEK_HOLE struct rw_file *rwf = (struct rw_file *)rw; static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER; if (rwf->seek_hole_supported) { diff --git a/copy/main.c b/copy/main.c index 67788b5d..390de1eb 100644 --- a/copy/main.c +++ b/copy/main.c @@ -513,44 +513,44 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp) fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name, rw->name); fprintf (fp, "%s: size=%" PRIi64 " (%s)\n", prefix, rw->size, human_size (buf, rw->size, NULL)); } /* Default implementation of rw->ops->get_extents for backends which * don't/can't support extents. Also used for the --no-extents case. */ void -default_get_extents (struct rw *rw, uintptr_t index, +default_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { struct extent e; ret->len = 0; e.offset = offset; e.length = count; e.zero = false; if (extent_list_append (ret, e) == -1) { perror ("realloc"); exit (EXIT_FAILURE); } } /* Implementations of get_polling_fd and asynch_notify_* for backends * which don't support polling. */ void -get_polling_fd_not_supported (struct rw *rw, uintptr_t index, +get_polling_fd_not_supported (struct rw *rw, size_t index, int *fd_rtn, int *direction_rtn) { /* Not an error, this causes poll to ignore the fd. */ *fd_rtn = -1; *direction_rtn = LIBNBD_AIO_DIRECTION_READ; } void -asynch_notify_read_write_not_supported (struct rw *rw, uintptr_t index) +asynch_notify_read_write_not_supported (struct rw *rw, size_t index) { /* nothing */ } diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index aa6a9f41..a1a8d09c 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -70,184 +70,185 @@ get_next_offset (uint64_t *offset, uint64_t *count) * the commands. We might move this into a callback, but those * are called from threads and not necessarily in monotonic order * so the progress bar would move erratically. */ progress_bar (*offset, src->size); } pthread_mutex_unlock (&lock); return r; } -static void *worker_thread (void *ip); +static void *worker_thread (void *wp); void multi_thread_copying (void) { - pthread_t *workers; + struct worker *workers; size_t i; int err; /* Some invariants that should be true if the main program called us * correctly. */ assert (threads > 0); assert (threads == connections); /* if (src.ops == &nbd_ops) assert (src.u.nbd.handles.size == connections); if (dst.ops == &nbd_ops) assert (dst.u.nbd.handles.size == connections); */ assert (src->size != -1); - workers = malloc (sizeof (pthread_t) * threads); + workers = calloc (threads, sizeof *workers); if (workers == NULL) { - perror ("malloc"); + perror ("calloc"); exit (EXIT_FAILURE); } /* Start the worker threads. */ for (i = 0; i < threads; ++i) { - err = pthread_create (&workers[i], NULL, worker_thread, - (void *)(uintptr_t)i); + workers[i].index = i; + err = pthread_create (&workers[i].thread, NULL, worker_thread, + &workers[i]); if (err != 0) { errno = err; perror ("pthread_create"); exit (EXIT_FAILURE); } } /* Wait until all worker threads exit. */ for (i = 0; i < threads; ++i) { - err = pthread_join (workers[i], NULL); + err = pthread_join (workers[i].thread, NULL); if (err != 0) { errno = err; perror ("pthread_join"); exit (EXIT_FAILURE); } } free (workers); } -static void wait_for_request_slots (uintptr_t index); -static unsigned in_flight (uintptr_t index); -static void poll_both_ends (uintptr_t index); +static void wait_for_request_slots (size_t index); +static unsigned in_flight (size_t index); +static void poll_both_ends (size_t index); static int finished_read (void *vp, int *error); static int finished_command (void *vp, int *error); static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, - uintptr_t index); + size_t index); /* There are 'threads' worker threads, each copying work ranges from * src to dst until there are no more work ranges. */ static void * -worker_thread (void *indexp) +worker_thread (void *wp) { - uintptr_t index = (uintptr_t) indexp; + struct worker *w = wp; uint64_t offset, count; extent_list exts = empty_vector; while (get_next_offset (&offset, &count)) { size_t i; assert (0 < count && count <= THREAD_WORK_SIZE); if (extents) - src->ops->get_extents (src, index, offset, count, &exts); + src->ops->get_extents (src, w->index, offset, count, &exts); else - default_get_extents (src, index, offset, count, &exts); + default_get_extents (src, w->index, offset, count, &exts); for (i = 0; i < exts.len; ++i) { struct command *command; size_t len; if (exts.ptr[i].zero) { /* The source is zero so we can proceed directly to skipping, * fast zeroing, or writing zeroes at the destination. */ command = create_command (exts.ptr[i].offset, exts.ptr[i].length, - true, index); + true, w->index); fill_dst_range_with_zeroes (command); } else /* data */ { /* As the extent might be larger than permitted for a single * command, we may have to split this into multiple read * requests. */ while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; command = create_command (exts.ptr[i].offset, len, - false, index); + false, w->index); - wait_for_request_slots (index); + wait_for_request_slots (w->index); /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); exts.ptr[i].offset += len; exts.ptr[i].length -= len; } } offset += count; count = 0; } /* for extents */ } /* Wait for in flight NBD requests to finish. */ - while (in_flight (index) > 0) - poll_both_ends (index); + while (in_flight (w->index) > 0) + poll_both_ends (w->index); free (exts.ptr); return NULL; } /* If the number of requests in flight exceeds the limit, poll * waiting for at least one request to finish. This enforces * the user --requests option. * * NB: Unfortunately it's not possible to call this from a callback, * since it will deadlock trying to grab the libnbd handle lock. This * means that although the worker thread calls this and enforces the * limit, when we split up requests into subrequests (eg. doing * sparseness detection) we will probably exceed the user request * limit. XXX */ static void -wait_for_request_slots (uintptr_t index) +wait_for_request_slots (size_t index) { while (in_flight (index) >= max_requests) poll_both_ends (index); } /* Count the number of asynchronous commands in flight. */ static unsigned -in_flight (uintptr_t index) +in_flight (size_t index) { return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); } /* Poll (optional) NBD src and NBD dst, moving the state machine(s) * along. This is a lightly modified nbd_poll. */ static void -poll_both_ends (uintptr_t index) +poll_both_ends (size_t index) { struct pollfd fds[2]; int r, direction; memset (fds, 0, sizeof fds); /* Note: if polling is not supported, this function will * set fd == -1 which poll ignores. */ src->ops->get_polling_fd (src, index, &fds[0].fd, &direction); @@ -331,21 +332,21 @@ create_buffer (size_t len) exit (EXIT_FAILURE); } buffer->refs = 1; return buffer; } /* Create a new command for read or zero. */ static struct command * -create_command (uint64_t offset, size_t len, bool zero, uintptr_t index) +create_command (uint64_t offset, size_t len, bool zero, size_t index) { struct command *command; command = calloc (1, sizeof *command); if (command == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } command->offset = offset; diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c index 10551d3a..dca86e88 100644 --- a/copy/nbd-ops.c +++ b/copy/nbd-ops.c @@ -377,32 +377,32 @@ add_extent (void *vp, const char *metacontext, exit (EXIT_FAILURE); } offset += entries[i]; } return 0; } static unsigned -nbd_ops_in_flight (struct rw *rw, uintptr_t index) +nbd_ops_in_flight (struct rw *rw, size_t index) { struct rw_nbd *rwn = (struct rw_nbd *) rw; /* Since the commands are auto-retired in the callbacks we don't * need to count "done" commands. */ return nbd_aio_in_flight (rwn->handles.ptr[index]); } static void -nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, +nbd_ops_get_polling_fd (struct rw *rw, size_t index, int *fd, int *direction) { struct rw_nbd *rwn = (struct rw_nbd *) rw; struct nbd_handle *nbd; nbd = rwn->handles.ptr[index]; *fd = nbd_aio_get_fd (nbd); if (*fd == -1) goto error; @@ -412,47 +412,47 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, goto error; return; error: fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } static void -nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) +nbd_ops_asynch_notify_read (struct rw *rw, size_t index) { struct rw_nbd *rwn = (struct rw_nbd *) rw; if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } static void -nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index) +nbd_ops_asynch_notify_write (struct rw *rw, size_t index) { struct rw_nbd *rwn = (struct rw_nbd *) rw; if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } /* This is done synchronously, but that's fine because commands from * the previous work range in flight continue to run, it's difficult * to (sanely) start new work until we have the full list of extents, * and in almost every case the remote NBD server can answer our * request for extents in a single round trip. */ static void -nbd_ops_get_extents (struct rw *rw, uintptr_t index, +nbd_ops_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { struct rw_nbd *rwn = (struct rw_nbd *) rw; extent_list exts = empty_vector; struct nbd_handle *nbd; nbd = rwn->handles.ptr[index]; ret->len = 0; diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index c070f8d7..4fe8bee6 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -71,36 +71,42 @@ struct buffer { struct slice { size_t len; /* Length of slice. */ size_t base; /* Start of slice relative to buffer. */ struct buffer *buffer; /* Underlying allocation (may be shared * or NULL). */ }; #define slice_ptr(slice) ((slice).buffer->data + (slice).base) +/* Worker state used by multi-threaded copying. */ +struct worker { + pthread_t thread; + size_t index; +}; + /* Commands for asynchronous operations in flight. * * We don't store the command type (read/write/zero/etc) because it is * implicit in the function being called and because commands * naturally change from read -> write/zero/etc as they progress. * * slice.buffer may be NULL for commands (like zero) that have no * associated data. * * A separate set of commands, slices and buffers is maintained per * thread so no locking is necessary. */ struct command { uint64_t offset; /* Offset relative to start of disk. */ struct slice slice; /* Data slice. */ - uintptr_t index; /* Thread number. */ + size_t index; /* Thread number. */ }; /* List of extents for rw->ops->get_extents. */ struct extent { uint64_t offset; uint64_t length; bool zero; }; DEFINE_VECTOR_TYPE(extent_list, struct extent); @@ -173,51 +179,51 @@ struct rw_ops { struct command *command, nbd_completion_callback cb); /* Asynchronously zero. command->slice.buffer is not used. If not possible, * returns false. 'cb' must be called only if returning true. */ bool (*asynch_zero) (struct rw *rw, struct command *command, nbd_completion_callback cb, bool allocate); /* Number of asynchronous commands in flight for a particular thread. */ - unsigned (*in_flight) (struct rw *rw, uintptr_t index); + unsigned (*in_flight) (struct rw *rw, size_t index); /* Get polling file descriptor and direction, and notify read/write. * For sources which cannot be polled (such as files and pipes) * get_polling_fd returns fd == -1 (NOT an error), and the * asynch_notify_* functions are no-ops. */ - void (*get_polling_fd) (struct rw *rw, uintptr_t index, + void (*get_polling_fd) (struct rw *rw, size_t index, int *fd_rtn, int *direction_rtn); - void (*asynch_notify_read) (struct rw *rw, uintptr_t index); - void (*asynch_notify_write) (struct rw *rw, uintptr_t index); + void (*asynch_notify_read) (struct rw *rw, size_t index); + void (*asynch_notify_write) (struct rw *rw, size_t index); /* Read base:allocation extents metadata for a region of the source. * For local files the same information is read from the kernel. * * Note that qemu-img fetches extents for the entire disk up front, * and we want to avoid doing that because it had very negative * behaviour for certain sources (ie. VDDK). */ - void (*get_extents) (struct rw *rw, uintptr_t index, + void (*get_extents) (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret); }; -extern void default_get_extents (struct rw *rw, uintptr_t index, +extern void default_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret); -extern void get_polling_fd_not_supported (struct rw *rw, uintptr_t index, +extern void get_polling_fd_not_supported (struct rw *rw, size_t index, int *fd_rtn, int *direction_rtn); extern void asynch_notify_read_write_not_supported (struct rw *rw, - uintptr_t index); + size_t index); extern bool allocated; extern unsigned connections; extern bool destination_is_zero; extern bool extents; extern bool flush; extern unsigned max_requests; extern bool progress; extern int progress_fd; extern unsigned request_size; diff --git a/copy/null-ops.c b/copy/null-ops.c index 5f1fda50..1218a623 100644 --- a/copy/null-ops.c +++ b/copy/null-ops.c @@ -126,27 +126,27 @@ static bool null_asynch_zero (struct rw *rw, struct command *command, nbd_completion_callback cb, bool allocate) { int dummy = 0; cb.callback (cb.user_data, &dummy); return true; } static unsigned -null_in_flight (struct rw *rw, uintptr_t index) +null_in_flight (struct rw *rw, size_t index) { return 0; } static void -null_get_extents (struct rw *rw, uintptr_t index, +null_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { abort (); } static struct rw_ops null_ops = { .ops_name = "null_ops", .close = null_close, .is_read_only = null_is_read_only, diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c index f9b8599a..3c8b6c2b 100644 --- a/copy/pipe-ops.c +++ b/copy/pipe-ops.c @@ -147,21 +147,21 @@ pipe_asynch_write (struct rw *rw, } static bool pipe_asynch_zero (struct rw *rw, struct command *command, nbd_completion_callback cb, bool allocate) { return false; /* not supported by pipes */ } static unsigned -pipe_in_flight (struct rw *rw, uintptr_t index) +pipe_in_flight (struct rw *rw, size_t index) { return 0; } static struct rw_ops pipe_ops = { .ops_name = "pipe_ops", .close = pipe_close, .is_read_only = pipe_is_read_only, -- 2.35.1 _______________________________________________ Libguestfs mailing list Libguestfs@redhat.com https://listman.redhat.com/mailman/listinfo/libguestfs