On 10/28/21 16:24, Richard W.M. Jones wrote:
> Since VDDK 6.0, asynchronous read and write operations are available.
> This commit makes use of these, allowing us to use the parallel thread
> model for increased performance.
> 
> Note that at least VDDK 6.5 is required because VDDK 6.0 had a
> different and incompatible signature for VixDiskLibCompletionCB.
> 
> Also note at least vSphere 6.7 is required for asynch calls to make
> any performance difference.  In older versions they work
> synchronously.
> 
> In the parallel thread model, nbdkit will be calling us in parallel
> from multiple nbdkit threads.  VDDK does not allow multiple threads to
> simultaneously call VDDK operations on the same handle.  So we create
> a background thread per handle (== connection).
> 
> Only the background thread makes VDDK calls[1].  The background thread
> handles a mix of synchronous (like extents, flush) and asynchronous
> (like read, write) operations, but all from one thread.
> 
> Parallel nbdkit threads issue commands to the background thread
> associated with each handle, and wait until they are retired.
> 
> [1] All VDDK calls except for connecting and disconnecting which for
> different reasons are protected by a global lock, so I did not need to
> change those.
> ---
>  plugins/vddk/nbdkit-vddk-plugin.pod |  11 +-
>  plugins/vddk/Makefile.am            |   1 +
>  plugins/vddk/vddk.h                 |  41 +-
>  plugins/vddk/vddk.c                 | 380 +++++-------------
>  plugins/vddk/worker.c               | 577 ++++++++++++++++++++++++++++
>  tests/dummy-vddk.c                  |  32 ++
>  6 files changed, 747 insertions(+), 295 deletions(-)
> 
> diff --git a/plugins/vddk/nbdkit-vddk-plugin.pod 
> b/plugins/vddk/nbdkit-vddk-plugin.pod
> index 1c16d0969..ce82a7342 100644
> --- a/plugins/vddk/nbdkit-vddk-plugin.pod
> +++ b/plugins/vddk/nbdkit-vddk-plugin.pod
> @@ -523,6 +523,14 @@ read bandwidth to the VMware server.
>  
>  Same as above, but for writing and flushing writes.
>  
> +=item C<ReadAsync>
> +
> +=item C<WriteAsync>
> +
> +Same as above, but for asynchronous read and write calls introduced in
> +nbdkit 1.30.  Unfortunately at the moment the amount of time spent in
> +these calls is not accounted for correctly.
> +
>  =item C<QueryAllocatedBlocks>
>  
>  This call is used to query information about the sparseness of the
> @@ -580,7 +588,8 @@ Debug extents returned by C<QueryAllocatedBlocks>.
>  
>  =item B<-D vddk.datapath=0>
>  
> -Suppress debugging of datapath calls (C<Read> and C<Write>).
> +Suppress debugging of datapath calls (C<Read>, C<ReadAsync>, C<Write>
> +and C<WriteAsync>).
>  
>  =item B<-D vddk.stats=1>
>  
> diff --git a/plugins/vddk/Makefile.am b/plugins/vddk/Makefile.am
> index 4f470ff9e..f8382fc91 100644
> --- a/plugins/vddk/Makefile.am
> +++ b/plugins/vddk/Makefile.am
> @@ -49,6 +49,7 @@ nbdkit_vddk_plugin_la_SOURCES = \
>       stats.c \
>       vddk-structs.h \
>       vddk-stubs.h \
> +     worker.c \
>       $(top_srcdir)/include/nbdkit-plugin.h \
>       $(NULL)
>  
> diff --git a/plugins/vddk/vddk.h b/plugins/vddk/vddk.h
> index 1400589dd..610fc8b77 100644
> --- a/plugins/vddk/vddk.h
> +++ b/plugins/vddk/vddk.h
> @@ -90,7 +90,9 @@ extern int vddk_debug_stats;
>    /* GCC can optimize this away at compile time: */                     \
>    const bool datapath =                                                 \
>      strcmp (#fn, "VixDiskLib_Read") == 0 ||                             \
> -    strcmp (#fn, "VixDiskLib_Write") == 0;                              \
> +    strcmp (#fn, "VixDiskLib_ReadAsync") == 0 ||                        \
> +    strcmp (#fn, "VixDiskLib_Write") == 0 ||                            \
> +    strcmp (#fn, "VixDiskLib_WriteAsync") == 0;                         \
>    if (vddk_debug_stats)                                                 \
>      gettimeofday (&start_t, NULL);                                      \
>    if (!datapath || vddk_debug_datapath)                                 \
> @@ -120,6 +122,38 @@ extern int vddk_debug_stats;
>      VDDK_CALL_END (VixDiskLib_FreeErrorText, 0);                \
>    } while (0)
>  
> +/* Queue of asynchronous commands sent to the background thread. */
> +enum command_type { GET_SIZE, READ, WRITE, FLUSH, CAN_EXTENTS, EXTENTS, STOP 
> };
> +struct command {
> +  /* These fields are set by the caller. */
> +  enum command_type type;       /* command */
> +  void *ptr;                    /* buffer, extents list, return values */
> +  uint32_t count;               /* READ, WRITE, EXTENTS */
> +  uint64_t offset;              /* READ, WRITE, EXTENTS */
> +  bool req_one;                 /* EXTENTS NBDKIT_FLAG_REQ_ONE */
> +
> +  /* This field is set to a unique value by send_command_and_wait. */
> +  uint64_t id;                  /* serial number */
> +
> +  /* These fields are used by the internal implementation. */
> +  pthread_mutex_t mutex;        /* completion mutex */
> +  pthread_cond_t cond;          /* completion condition */
> +  enum { SUBMITTED, SUCCEEDED, FAILED } status;
> +};
> +
> +DEFINE_VECTOR_TYPE(command_queue, struct command *)
> +
> +/* The per-connection handle. */
> +struct vddk_handle {
> +  VixDiskLibConnectParams *params; /* connection parameters */
> +  VixDiskLibConnection connection; /* connection */
> +  VixDiskLibHandle handle;         /* disk handle */
> +  pthread_t thread;                /* background thread for asynch work */
> +  command_queue commands;          /* commands for background thread */
> +  pthread_mutex_t commands_lock;   /* lock on command queue */
> +  pthread_cond_t commands_cond;    /* condition (queue size 0 -> 1) */
> +};
> +
>  /* reexec.c */
>  extern bool noreexec;
>  extern char *reexeced;
> @@ -141,4 +175,9 @@ extern pthread_mutex_t stats_lock;
>  #undef OPTIONAL_STUB
>  extern void display_stats (void);
>  
> +/* worker.c */
> +extern const char *command_type_string (enum command_type type);
> +extern int send_command_and_wait (struct vddk_handle *h, struct command 
> *cmd);
> +extern void *vddk_worker_thread (void *handle);
> +
>  #endif /* NBDKIT_VDDK_H */
> diff --git a/plugins/vddk/vddk.c b/plugins/vddk/vddk.c
> index c05dbfccc..cd3c31349 100644
> --- a/plugins/vddk/vddk.c
> +++ b/plugins/vddk/vddk.c
> @@ -50,9 +50,6 @@
>  #include <nbdkit-plugin.h>
>  
>  #include "cleanup.h"
> -#include "minmax.h"
> -#include "rounding.h"
> -#include "tvdiff.h"
>  #include "vector.h"
>  
>  #include "vddk.h"
> @@ -522,23 +519,18 @@ vddk_dump_plugin (void)
>  /* The rules on threads and VDDK are here:
>   * 
> https://code.vmware.com/docs/11750/virtual-disk-development-kit-programming-guide/GUID-6BE903E8-DC70-46D9-98E4-E34A2002C2AD.html
>   *
> - * Before nbdkit 1.22 we used SERIALIZE_ALL_REQUESTS.  Since nbdkit
> - * 1.22 we changed this to SERIALIZE_REQUESTS and added a mutex around
> - * calls to VixDiskLib_Open and VixDiskLib_Close.  This is not quite
> - * within the letter of the rules, but is within the spirit.
> + * Before nbdkit 1.22 we used SERIALIZE_ALL_REQUESTS.  In nbdkit
> + * 1.22-1.28 we changed this to SERIALIZE_REQUESTS and added a mutex
> + * around calls to VixDiskLib_Open and VixDiskLib_Close.  In nbdkit
> + * 1.30 and above we assign a background thread per connection to do
> + * asynch operations and use the PARALLEL model.  We still need the
> + * lock around Open and Close.
>   */
> -#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS
> +#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
>  
>  /* Lock protecting open/close calls - see above. */
>  static pthread_mutex_t open_close_lock = PTHREAD_MUTEX_INITIALIZER;
>  
> -/* The per-connection handle. */
> -struct vddk_handle {
> -  VixDiskLibConnectParams *params; /* connection parameters */
> -  VixDiskLibConnection connection; /* connection */
> -  VixDiskLibHandle handle;         /* disk handle */
> -};
> -
>  static inline VixDiskLibConnectParams *
>  allocate_connect_params (void)
>  {
> @@ -579,12 +571,16 @@ vddk_open (int readonly)
>    VixError err;
>    uint32_t flags;
>    const char *transport_mode;
> +  int pterr;
>  
> -  h = malloc (sizeof *h);
> +  h = calloc (1, sizeof *h);
>    if (h == NULL) {
> -    nbdkit_error ("malloc: %m");
> +    nbdkit_error ("calloc: %m");
>      return NULL;
>    }
> +  h->commands = (command_queue) empty_vector;
> +  pthread_mutex_init (&h->commands_lock, NULL);
> +  pthread_cond_init (&h->commands_cond, NULL);
>  
>    h->params = allocate_connect_params ();
>    if (h->params == NULL) {
> @@ -661,8 +657,22 @@ vddk_open (int readonly)
>    VDDK_CALL_END (VixDiskLib_GetTransportMode, 0);
>    nbdkit_debug ("transport mode: %s", transport_mode);
>  
> +  /* Start the background thread which actually does the asynchronous
> +   * work.
> +   */
> +  pterr = pthread_create (&h->thread, NULL, vddk_worker_thread, h);
> +  if (pterr != 0) {
> +    errno = pterr;
> +    nbdkit_error ("pthread_create: %m");
> +    goto err3;
> +  }
> +
>    return h;
>  
> + err3:
> +  VDDK_CALL_START (VixDiskLib_Close, "handle")
> +    VixDiskLib_Close (h->handle);
> +  VDDK_CALL_END (VixDiskLib_Close, 0);
>   err2:
>    VDDK_CALL_START (VixDiskLib_Disconnect, "connection")
>      VixDiskLib_Disconnect (h->connection);
> @@ -670,6 +680,8 @@ vddk_open (int readonly)
>   err1:
>    free_connect_params (h->params);
>   err0:
> +  pthread_mutex_destroy (&h->commands_lock);
> +  pthread_cond_destroy (&h->commands_cond);
>    free (h);
>    return NULL;
>  }
> @@ -680,6 +692,10 @@ vddk_close (void *handle)
>  {
>    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&open_close_lock);
>    struct vddk_handle *h = handle;
> +  struct command stop_cmd = { .type = STOP };
> +
> +  send_command_and_wait (h, &stop_cmd);
> +  pthread_join (h->thread, NULL);
>  
>    VDDK_CALL_START (VixDiskLib_Close, "handle")
>      VixDiskLib_Close (h->handle);
> @@ -689,6 +705,9 @@ vddk_close (void *handle)
>    VDDK_CALL_END (VixDiskLib_Disconnect, 0);
>  
>    free_connect_params (h->params);
> +  pthread_mutex_destroy (&h->commands_lock);
> +  pthread_cond_destroy (&h->commands_cond);
> +  command_queue_reset (&h->commands);
>    free (h);
>  }
>  
> @@ -697,54 +716,29 @@ static int64_t
>  vddk_get_size (void *handle)
>  {
>    struct vddk_handle *h = handle;
> -  VixDiskLibInfo *info;
> -  VixError err;
>    uint64_t size;
> +  struct command get_size_cmd = { .type = GET_SIZE, .ptr = &size };
>  
> -  VDDK_CALL_START (VixDiskLib_GetInfo, "handle, &info")
> -    err = VixDiskLib_GetInfo (h->handle, &info);
> -  VDDK_CALL_END (VixDiskLib_GetInfo, 0);
> -  if (err != VIX_OK) {
> -    VDDK_ERROR (err, "VixDiskLib_GetInfo");
> +  if (send_command_and_wait (h, &get_size_cmd) == -1)
>      return -1;
> -  }
> -
> -  size = info->capacity * (uint64_t)VIXDISKLIB_SECTOR_SIZE;
> -
> -  if (vddk_debug_diskinfo) {
> -    nbdkit_debug ("disk info: capacity: %" PRIu64 " sectors "
> -                  "(%" PRIi64 " bytes)",
> -                  info->capacity, size);
> -    nbdkit_debug ("disk info: biosGeo: C:%" PRIu32 " H:%" PRIu32 " S:%" 
> PRIu32,
> -                  info->biosGeo.cylinders,
> -                  info->biosGeo.heads,
> -                  info->biosGeo.sectors);
> -    nbdkit_debug ("disk info: physGeo: C:%" PRIu32 " H:%" PRIu32 " S:%" 
> PRIu32,
> -                  info->physGeo.cylinders,
> -                  info->physGeo.heads,
> -                  info->physGeo.sectors);
> -    nbdkit_debug ("disk info: adapter type: %d",
> -                  (int) info->adapterType);
> -    nbdkit_debug ("disk info: num links: %d", info->numLinks);
> -    nbdkit_debug ("disk info: parent filename hint: %s",
> -                  info->parentFileNameHint ? : "NULL");
> -    nbdkit_debug ("disk info: uuid: %s",
> -                  info->uuid ? : "NULL");
> -    if (library_version >= 7) {
> -      nbdkit_debug ("disk info: sector size: "
> -                    "logical %" PRIu32 " physical %" PRIu32,
> -                    info->logicalSectorSize,
> -                    info->physicalSectorSize);
> -    }
> -  }
> -
> -  VDDK_CALL_START (VixDiskLib_FreeInfo, "info")
> -    VixDiskLib_FreeInfo (info);
> -  VDDK_CALL_END (VixDiskLib_FreeInfo, 0);
>  
>    return (int64_t) size;
>  }
>  
> +static int
> +vddk_can_fua (void *handle)
> +{
> +  /* The Flush call was not available in VDDK < 6.0. */
> +  return VixDiskLib_Flush != NULL ? NBDKIT_FUA_NATIVE : NBDKIT_FUA_NONE;
> +}
> +
> +static int
> +vddk_can_flush (void *handle)
> +{
> +  /* The Flush call was not available in VDDK < 6.0. */
> +  return VixDiskLib_Flush != NULL;
> +}
> +
>  /* Read data from the file.
>   *
>   * Note that reads have to be aligned to sectors (XXX).
> @@ -754,32 +748,14 @@ vddk_pread (void *handle, void *buf, uint32_t count, 
> uint64_t offset,
>              uint32_t flags)
>  {
>    struct vddk_handle *h = handle;
> -  VixError err;
> +  struct command read_cmd = {
> +    .type = READ,
> +    .ptr = buf,
> +    .count = count,
> +    .offset = offset,
> +  };
>  
> -  /* Align to sectors. */
> -  if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
> -    nbdkit_error ("%s is not aligned to sectors", "read");
> -    return -1;
> -  }
> -  if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
> -    nbdkit_error ("%s is not aligned to sectors", "read");
> -    return -1;
> -  }
> -  offset /= VIXDISKLIB_SECTOR_SIZE;
> -  count /= VIXDISKLIB_SECTOR_SIZE;
> -
> -  VDDK_CALL_START (VixDiskLib_Read,
> -                   "handle, %" PRIu64 " sectors, "
> -                   "%" PRIu32 " sectors, buffer",
> -                   offset, count)
> -    err = VixDiskLib_Read (h->handle, offset, count, buf);
> -  VDDK_CALL_END (VixDiskLib_Read, count * VIXDISKLIB_SECTOR_SIZE);
> -  if (err != VIX_OK) {
> -    VDDK_ERROR (err, "VixDiskLib_Read");
> -    return -1;
> -  }
> -
> -  return 0;
> +  return send_command_and_wait (h, &read_cmd);
>  }
>  
>  static int vddk_flush (void *handle, uint32_t flags);
> @@ -792,32 +768,17 @@ static int
>  vddk_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,
>               uint32_t flags)
>  {
> +  struct vddk_handle *h = handle;
>    const bool fua = flags & NBDKIT_FLAG_FUA;
> -  struct vddk_handle *h = handle;
> -  VixError err;
> +  struct command write_cmd = {
> +    .type = WRITE,
> +    .ptr = (void *) buf,
> +    .count = count,
> +    .offset = offset,
> +  };
>  
> -  /* Align to sectors. */
> -  if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
> -    nbdkit_error ("%s is not aligned to sectors", "write");
> +  if (send_command_and_wait (h, &write_cmd) == -1)
>      return -1;
> -  }
> -  if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
> -    nbdkit_error ("%s is not aligned to sectors", "write");
> -    return -1;
> -  }
> -  offset /= VIXDISKLIB_SECTOR_SIZE;
> -  count /= VIXDISKLIB_SECTOR_SIZE;
> -
> -  VDDK_CALL_START (VixDiskLib_Write,
> -                   "handle, %" PRIu64 " sectors, "
> -                   "%" PRIu32 " sectors, buffer",
> -                   offset, count)
> -    err = VixDiskLib_Write (h->handle, offset, count, buf);
> -  VDDK_CALL_END (VixDiskLib_Write, count * VIXDISKLIB_SECTOR_SIZE);
> -  if (err != VIX_OK) {
> -    VDDK_ERROR (err, "VixDiskLib_Write");
> -    return -1;
> -  }
>  
>    if (fua) {
>      if (vddk_flush (handle, 0) == -1)
> @@ -827,126 +788,32 @@ vddk_pwrite (void *handle, const void *buf, uint32_t 
> count, uint64_t offset,
>    return 0;
>  }
>  
> -static int
> -vddk_can_fua (void *handle)
> -{
> -  /* The Flush call was not available in VDDK < 6.0. */
> -  return VixDiskLib_Flush != NULL ? NBDKIT_FUA_NATIVE : NBDKIT_FUA_NONE;
> -}
> -
> -static int
> -vddk_can_flush (void *handle)
> -{
> -  /* The Flush call was not available in VDDK < 6.0. */
> -  return VixDiskLib_Flush != NULL;
> -}
> -
>  /* Flush data to the file. */
>  static int
>  vddk_flush (void *handle, uint32_t flags)
>  {
>    struct vddk_handle *h = handle;
> -  VixError err;
> +  struct command flush_cmd = {
> +    .type = FLUSH,
> +  };
>  
> -  /* The documentation for Flush is missing, but the comment in the
> -   * header file seems to indicate that it waits for WriteAsync
> -   * commands to finish.  We don't use WriteAsync, and in any case
> -   * there's a new function Wait to wait for those.  However I
> -   * verified using strace that in fact Flush does call fsync on the
> -   * file so it appears to be the correct call to use here.
> -   */
> -
> -  VDDK_CALL_START (VixDiskLib_Flush, "handle")
> -    err = VixDiskLib_Flush (h->handle);
> -  VDDK_CALL_END (VixDiskLib_Flush, 0);
> -  if (err != VIX_OK) {
> -    VDDK_ERROR (err, "VixDiskLib_Flush");
> -    return -1;
> -  }
> -
> -  return 0;
> +  return send_command_and_wait (h, &flush_cmd);
>  }
>  
>  static int
>  vddk_can_extents (void *handle)
>  {
>    struct vddk_handle *h = handle;
> -  VixError err;
> -  VixDiskLibBlockList *block_list;
> +  int ret;
> +  struct command can_extents_cmd = {
> +    .type = CAN_EXTENTS,
> +    .ptr = &ret,
> +  };
>  
> -  /* This call was added in VDDK 6.7.  In earlier versions the
> -   * function pointer will be NULL and we cannot query extents.
> -   */
> -  if (VixDiskLib_QueryAllocatedBlocks == NULL) {
> -    nbdkit_debug ("can_extents: VixDiskLib_QueryAllocatedBlocks == NULL, "
> -                  "probably this is VDDK < 6.7");
> -    return 0;
> -  }
> -
> -  /* Suppress errors around this call.  See:
> -   * https://bugzilla.redhat.com/show_bug.cgi?id=1709211#c7
> -   */
> -  error_suppression = 1;
> -
> -  /* However even when the call is available it rarely works well so
> -   * the best thing we can do here is to try the call and if it's
> -   * non-functional return false.
> -   */
> -  VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
> -                   "handle, 0, %d sectors, %d sectors",
> -                   VIXDISKLIB_MIN_CHUNK_SIZE, VIXDISKLIB_MIN_CHUNK_SIZE)
> -    err = VixDiskLib_QueryAllocatedBlocks (h->handle,
> -                                           0, VIXDISKLIB_MIN_CHUNK_SIZE,
> -                                           VIXDISKLIB_MIN_CHUNK_SIZE,
> -                                           &block_list);
> -  VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
> -  error_suppression = 0;
> -  if (err == VIX_OK) {
> -    VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
> -      VixDiskLib_FreeBlockList (block_list);
> -    VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
> -  }
> -  if (err != VIX_OK) {
> -    char *errmsg = VixDiskLib_GetErrorText (err, NULL);
> -    nbdkit_debug ("can_extents: VixDiskLib_QueryAllocatedBlocks test failed, 
> "
> -                  "extents support will be disabled: "
> -                  "original error: %s",
> -                  errmsg);
> -    VixDiskLib_FreeErrorText (errmsg);
> -    return 0;
> -  }
> -
> -  return 1;
> -}
> -
> -static int
> -add_extent (struct nbdkit_extents *extents,
> -            uint64_t *position, uint64_t next_position, bool is_hole)
> -{
> -  uint32_t type = 0;
> -  const uint64_t length = next_position - *position;
> -
> -  if (is_hole) {
> -    type = NBDKIT_EXTENT_HOLE;
> -    /* Images opened as single link might be backed by another file in the
> -       chain, so the holes are not guaranteed to be zeroes. */
> -    if (!single_link)
> -      type |= NBDKIT_EXTENT_ZERO;
> -  }
> -
> -  assert (*position <= next_position);
> -  if (*position == next_position)
> -    return 0;
> -
> -  if (vddk_debug_extents)
> -    nbdkit_debug ("adding extent type %s at [%" PRIu64 "...%" PRIu64 "]",
> -                  is_hole ? "hole" : "allocated data",
> -                  *position, next_position-1);
> -  if (nbdkit_add_extent (extents, *position, length, type) == -1)
> +  if (send_command_and_wait (h, &can_extents_cmd) == -1)
>      return -1;
>  
> -  *position = next_position;
> -  return 0;
> +  return ret;
>  }
>  
>  static int
> @@ -955,88 +822,15 @@ vddk_extents (void *handle, uint32_t count, uint64_t 
> offset, uint32_t flags,
>  {
>    struct vddk_handle *h = handle;
>    bool req_one = flags & NBDKIT_FLAG_REQ_ONE;
> -  uint64_t position, end, start_sector;
> -
> -  position = offset;
> -  end = offset + count;
> -
> -  /* We can only query whole chunks.  Therefore start with the first
> -   * chunk before offset.
> -   */
> -  start_sector =
> -    ROUND_DOWN (offset, VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
> -    / VIXDISKLIB_SECTOR_SIZE;
> -  while (start_sector * VIXDISKLIB_SECTOR_SIZE < end) {
> -    VixError err;
> -    uint32_t i;
> -    uint64_t nr_chunks, nr_sectors;
> -    VixDiskLibBlockList *block_list;
> -
> -    assert (IS_ALIGNED (start_sector, VIXDISKLIB_MIN_CHUNK_SIZE));
> -
> -    nr_chunks =
> -      ROUND_UP (end - start_sector * VIXDISKLIB_SECTOR_SIZE,
> -                VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
> -      / (VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE);
> -    nr_chunks = MIN (nr_chunks, VIXDISKLIB_MAX_CHUNK_NUMBER);
> -    nr_sectors = nr_chunks * VIXDISKLIB_MIN_CHUNK_SIZE;
> -
> -    VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
> -                     "handle, %" PRIu64 " sectors, %" PRIu64 " sectors, "
> -                     "%d sectors",
> -                     start_sector, nr_sectors, VIXDISKLIB_MIN_CHUNK_SIZE)
> -      err = VixDiskLib_QueryAllocatedBlocks (h->handle,
> -                                             start_sector, nr_sectors,
> -                                             VIXDISKLIB_MIN_CHUNK_SIZE,
> -                                             &block_list);
> -    VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
> -    if (err != VIX_OK) {
> -      VDDK_ERROR (err, "VixDiskLib_QueryAllocatedBlocks");
> -      return -1;
> -    }
> -
> -    for (i = 0; i < block_list->numBlocks; ++i) {
> -      uint64_t blk_offset, blk_length;
> -
> -      blk_offset = block_list->blocks[i].offset * VIXDISKLIB_SECTOR_SIZE;
> -      blk_length = block_list->blocks[i].length * VIXDISKLIB_SECTOR_SIZE;
> -
> -      /* The query returns allocated blocks.  We must insert holes
> -       * between the blocks as necessary.
> -       */
> -      if ((position < blk_offset &&
> -           add_extent (extents, &position, blk_offset, true) == -1) ||
> -          (add_extent (extents,
> -                       &position, blk_offset + blk_length, false) == -1)) {
> -        VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
> -          VixDiskLib_FreeBlockList (block_list);
> -        VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
> -        return -1;
> -      }
> -    }
> -    VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
> -      VixDiskLib_FreeBlockList (block_list);
> -    VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
> -
> -    /* There's an implicit hole after the returned list of blocks, up
> -     * to the end of the QueryAllocatedBlocks request.
> -     */
> -    if (add_extent (extents,
> -                    &position,
> -                    (start_sector + nr_sectors) * VIXDISKLIB_SECTOR_SIZE,
> -                    true) == -1)
> -      return -1;
> -
> -    start_sector += nr_sectors;
> -
> -    /* If one extent was requested, as long as we've added an extent
> -     * overlapping the original offset we're done.
> -     */
> -    if (req_one && position > offset)
> -      break;
> -  }
> -
> -  return 0;
> +  struct command extents_cmd = {
> +    .type = EXTENTS,
> +    .ptr = extents,
> +    .count = count,
> +    .offset = offset,
> +    .req_one = req_one,
> +  };
> +
> +  return send_command_and_wait (h, &extents_cmd);
>  }
>  
>  static struct nbdkit_plugin plugin = {
> diff --git a/plugins/vddk/worker.c b/plugins/vddk/worker.c
> new file mode 100644
> index 000000000..7a7f6a333
> --- /dev/null
> +++ b/plugins/vddk/worker.c
> @@ -0,0 +1,577 @@
> +/* nbdkit
> + * Copyright (C) 2013-2021 Red Hat Inc.
> + *
> + * Redistribution and use in source and binary forms, with or without
> + * modification, are permitted provided that the following conditions are
> + * met:
> + *
> + * * Redistributions of source code must retain the above copyright
> + * notice, this list of conditions and the following disclaimer.
> + *
> + * * Redistributions in binary form must reproduce the above copyright
> + * notice, this list of conditions and the following disclaimer in the
> + * documentation and/or other materials provided with the distribution.
> + *
> + * * Neither the name of Red Hat nor the names of its contributors may be
> + * used to endorse or promote products derived from this software without
> + * specific prior written permission.
> + *
> + * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
> + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
> + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
> + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
> + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
> + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
> + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
> + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include <config.h>
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <inttypes.h>
> +
> +#ifdef HAVE_STDATOMIC_H
> +#include <stdatomic.h>
> +#else
> +/* Some old platforms lack atomic types, but this is only used for
> + * debug messages.
> + */
> +#define _Atomic /**/
> +#endif
> +
> +#include <pthread.h>
> +
> +#define NBDKIT_API_VERSION 2
> +#include <nbdkit-plugin.h>
> +
> +#include "cleanup.h"
> +#include "minmax.h"
> +#include "rounding.h"
> +#include "vector.h"
> +
> +#include "vddk.h"
> +
> +const char *
> +command_type_string (enum command_type type)
> +{
> +  switch (type) {
> +  case GET_SIZE:    return "get_size";
> +  case READ:        return "read";
> +  case WRITE:       return "write";
> +  case FLUSH:       return "flush";
> +  case CAN_EXTENTS: return "can_extents";
> +  case EXTENTS:     return "extents";
> +  case STOP:        return "stop";
> +  default:          abort ();
> +  }
> +}
> +
> +/* Send command to the background thread and wait for completion.
> + *
> + * Returns 0 for OK
> + * On error, calls nbdkit_error and returns -1.
> + */
> +int
> +send_command_and_wait (struct vddk_handle *h, struct command *cmd)
> +{
> +  static _Atomic uint64_t id = 0;

Ugh.

_Atomic is from C11, and it comes with a huge theoretical baggage
(section "5.1.2.4 Multi-threaded executions and data races"). Do we
really need this? For example:

> +
> +  cmd->id = id++;

Here I need to read up on the postfix increment operator, and find that:

  Postfix ++ on an object with atomic type is a read-modify-write
  operation with memory_order_seq_cst memory order semantics. 98)

with footnote 98 saying

  Where a pointer to an atomic object can be formed and E has integer
  type, E++ is equivalent to the following code sequence where T is the
  type of E:

  T *addr = &E;
  T old = *addr;
  T new;
  do {
    new = old + 1;
  } while (!atomic_compare_exchange_strong(addr, &old, new));

  with old being the result of the operation.

So ultimately it seems safe... assuming we HAVE_STDATOMIC_H anyway.

(1) I still think this is an unnecessary complication. We're just about
to take the command queue lock anyway. Why not turn the current static
_Atomic "id" object into a member of the "vddk_handle" structure? The
command queue mutex would then protect "id" as well, so we could safely
fetch/increment it upon command submission, without C11 atomics.
(Admittedly, different VDDK handles would have separate ID streams then.)


> +
> +  /* Add the command to the command queue. */
> +  {
> +    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->commands_lock);
> +
> +    if (command_queue_append (&h->commands, cmd) == -1)
> +      /* On error command_queue_append will call nbdkit_error. */
> +      return -1;
> +
> +    pthread_cond_signal (&h->commands_cond);

I rest my case of conditional signaling :)

... The rest looks good to me. If you really like the _Atomic thing, I
can ACK this version; otherwise I'd be glad to ACK v3.

Thanks!
Laszlo

> +
> +    /* This will be used to signal command completion back to us. */
> +    pthread_mutex_init (&cmd->mutex, NULL);
> +    pthread_cond_init (&cmd->cond, NULL);
> +  }
> +
> +  /* Wait for the command to be completed by the background thread. */
> +  {
> +    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> +    while (cmd->status == SUBMITTED)
> +      pthread_cond_wait (&cmd->cond, &cmd->mutex);
> +  }
> +
> +  pthread_mutex_destroy (&cmd->mutex);
> +  pthread_cond_destroy (&cmd->cond);
> +
> +  /* On error the background thread will call nbdkit_error. */
> +  switch (cmd->status) {
> +  case SUCCEEDED: return 0;
> +  case FAILED:    return -1;
> +  default:        abort ();
> +  }
> +}

Seems OK to me.


> +
> +/* Asynchronous commands are completed when this function is called. */
> +static void
> +complete_command (void *vp, VixError result)
> +{
> +  struct command *cmd = vp;
> +
> +  if (vddk_debug_datapath)
> +    nbdkit_debug ("command %" PRIu64 " completed", cmd->id);
> +
> +  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> +
> +  if (result == VIX_OK) {
> +    cmd->status = SUCCEEDED;
> +  } else {
> +    VDDK_ERROR (result, "command %" PRIu64 ": asynchronous %s failed",
> +                cmd->id, command_type_string (cmd->type));
> +    cmd->status = FAILED;
> +  }
> +
> +  pthread_cond_signal (&cmd->cond);
> +}
> +
> +/* Wait for any asynchronous commands to complete. */
> +static int
> +do_stop (struct command *cmd, struct vddk_handle *h)
> +{
> +  VixError err;
> +
> +  /* Because we assume VDDK >= 6.5, VixDiskLib_Wait must exist. */
> +  VDDK_CALL_START (VixDiskLib_Wait, "handle")
> +    err = VixDiskLib_Wait (h->handle);
> +  VDDK_CALL_END (VixDiskLib_Wait, 0);
> +  if (err != VIX_OK) {
> +    VDDK_ERROR (err, "VixDiskLib_Wait");
> +    /* In the end this error indication is ignored because it only
> +     * happens on the close path when we cannot handle errors.
> +     */
> +    return -1;
> +  }
> +  return 0;
> +}
> +
> +/* Get size command. */
> +static int64_t
> +do_get_size (struct command *cmd, struct vddk_handle *h)
> +{
> +  VixError err;
> +  VixDiskLibInfo *info;
> +  uint64_t size;
> +
> +  VDDK_CALL_START (VixDiskLib_GetInfo, "handle, &info")
> +    err = VixDiskLib_GetInfo (h->handle, &info);
> +  VDDK_CALL_END (VixDiskLib_GetInfo, 0);
> +  if (err != VIX_OK) {
> +    VDDK_ERROR (err, "VixDiskLib_GetInfo");
> +    return -1;
> +  }
> +
> +  size = info->capacity * (uint64_t)VIXDISKLIB_SECTOR_SIZE;
> +
> +  if (vddk_debug_diskinfo) {
> +    nbdkit_debug ("disk info: capacity: %" PRIu64 " sectors "
> +                  "(%" PRIi64 " bytes)",
> +                  info->capacity, size);
> +    nbdkit_debug ("disk info: biosGeo: C:%" PRIu32 " H:%" PRIu32 " S:%" 
> PRIu32,
> +                  info->biosGeo.cylinders,
> +                  info->biosGeo.heads,
> +                  info->biosGeo.sectors);
> +    nbdkit_debug ("disk info: physGeo: C:%" PRIu32 " H:%" PRIu32 " S:%" 
> PRIu32,
> +                  info->physGeo.cylinders,
> +                  info->physGeo.heads,
> +                  info->physGeo.sectors);
> +    nbdkit_debug ("disk info: adapter type: %d",
> +                  (int) info->adapterType);
> +    nbdkit_debug ("disk info: num links: %d", info->numLinks);
> +    nbdkit_debug ("disk info: parent filename hint: %s",
> +                  info->parentFileNameHint ? : "NULL");
> +    nbdkit_debug ("disk info: uuid: %s",
> +                  info->uuid ? : "NULL");
> +    if (library_version >= 7) {
> +      nbdkit_debug ("disk info: sector size: "
> +                    "logical %" PRIu32 " physical %" PRIu32,
> +                    info->logicalSectorSize,
> +                    info->physicalSectorSize);
> +    }
> +  }
> +
> +  VDDK_CALL_START (VixDiskLib_FreeInfo, "info")
> +    VixDiskLib_FreeInfo (info);
> +  VDDK_CALL_END (VixDiskLib_FreeInfo, 0);
> +
> +  return (int64_t) size;
> +}
> +
> +static int
> +do_read (struct command *cmd, struct vddk_handle *h)
> +{
> +  VixError err;
> +  uint32_t count = cmd->count;
> +  uint64_t offset = cmd->offset;
> +  void *buf = cmd->ptr;
> +
> +  /* Align to sectors. */
> +  if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
> +    nbdkit_error ("%s is not aligned to sectors", "read");
> +    return -1;
> +  }
> +  if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
> +    nbdkit_error ("%s is not aligned to sectors", "read");
> +    return -1;
> +  }
> +  offset /= VIXDISKLIB_SECTOR_SIZE;
> +  count /= VIXDISKLIB_SECTOR_SIZE;
> +
> +  VDDK_CALL_START (VixDiskLib_ReadAsync,
> +                   "handle, %" PRIu64 " sectors, "
> +                   "%" PRIu32 " sectors, buffer, callback, %" PRIu64,
> +                   offset, count, cmd->id)
> +    err = VixDiskLib_ReadAsync (h->handle, offset, count, buf,
> +                                complete_command, cmd);
> +  VDDK_CALL_END (VixDiskLib_ReadAsync, count * VIXDISKLIB_SECTOR_SIZE);
> +  if (err != VIX_ASYNC) {
> +    VDDK_ERROR (err, "VixDiskLib_ReadAsync");
> +    return -1;
> +  }
> +
> +  return 0;
> +}
> +
> +static int
> +do_write (struct command *cmd, struct vddk_handle *h)
> +{
> +  VixError err;
> +  uint32_t count = cmd->count;
> +  uint64_t offset = cmd->offset;
> +  const void *buf = cmd->ptr;
> +
> +  /* Align to sectors. */
> +  if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
> +    nbdkit_error ("%s is not aligned to sectors", "write");
> +    return -1;
> +  }
> +  if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
> +    nbdkit_error ("%s is not aligned to sectors", "write");
> +    return -1;
> +  }
> +  offset /= VIXDISKLIB_SECTOR_SIZE;
> +  count /= VIXDISKLIB_SECTOR_SIZE;
> +
> +  VDDK_CALL_START (VixDiskLib_WriteAsync,
> +                   "handle, %" PRIu64 " sectors, "
> +                   "%" PRIu32 " sectors, buffer, callback, %" PRIu64,
> +                   offset, count, cmd->id)
> +    err = VixDiskLib_WriteAsync (h->handle, offset, count, buf,
> +                                 complete_command, cmd);
> +  VDDK_CALL_END (VixDiskLib_WriteAsync, count * VIXDISKLIB_SECTOR_SIZE);
> +  if (err != VIX_ASYNC) {
> +    VDDK_ERROR (err, "VixDiskLib_WriteAsync");
> +    return -1;
> +  }
> +
> +  return 0;
> +}
> +
> +static int
> +do_flush (struct command *cmd, struct vddk_handle *h)
> +{
> +  VixError err;
> +
> +  /* It seems safer to wait for outstanding asynchronous commands to
> +   * complete before doing a flush, so do this but ignore errors
> +   * except to print them.
> +   */
> +  VDDK_CALL_START (VixDiskLib_Wait, "handle")
> +    err = VixDiskLib_Wait (h->handle);
> +  VDDK_CALL_END (VixDiskLib_Wait, 0);
> +  if (err != VIX_OK)
> +    VDDK_ERROR (err, "VixDiskLib_Wait");
> +
> +  /* The documentation for Flush is missing, but the comment in the
> +   * header file seems to indicate that it waits for WriteAsync
> +   * commands to finish.  There's a new function Wait to wait for
> +   * those.  However I verified using strace that in fact Flush calls
> +   * fsync on the file so it appears to be the correct call to use
> +   * here.
> +   */
> +  VDDK_CALL_START (VixDiskLib_Flush, "handle")
> +    err = VixDiskLib_Flush (h->handle);
> +  VDDK_CALL_END (VixDiskLib_Flush, 0);
> +  if (err != VIX_OK) {
> +    VDDK_ERROR (err, "VixDiskLib_Flush");
> +    return -1;
> +  }
> +
> +  return 0;
> +}
> +
> +static int
> +do_can_extents (struct command *cmd, struct vddk_handle *h)
> +{
> +  VixError err;
> +  VixDiskLibBlockList *block_list;
> +
> +  /* This call was added in VDDK 6.7.  In earlier versions the
> +   * function pointer will be NULL and we cannot query extents.
> +   */
> +  if (VixDiskLib_QueryAllocatedBlocks == NULL) {
> +    nbdkit_debug ("can_extents: VixDiskLib_QueryAllocatedBlocks == NULL, "
> +                  "probably this is VDDK < 6.7");
> +    return 0;
> +  }
> +
> +  /* Suppress errors around this call.  See:
> +   * https://bugzilla.redhat.com/show_bug.cgi?id=1709211#c7
> +   */
> +  error_suppression = 1;
> +
> +  /* However even when the call is available it rarely works well so
> +   * the best thing we can do here is to try the call and if it's
> +   * non-functional return false.
> +   */
> +  VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
> +                   "handle, 0, %d sectors, %d sectors",
> +                   VIXDISKLIB_MIN_CHUNK_SIZE, VIXDISKLIB_MIN_CHUNK_SIZE)
> +    err = VixDiskLib_QueryAllocatedBlocks (h->handle,
> +                                           0, VIXDISKLIB_MIN_CHUNK_SIZE,
> +                                           VIXDISKLIB_MIN_CHUNK_SIZE,
> +                                           &block_list);
> +  VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
> +  error_suppression = 0;
> +  if (err == VIX_OK) {
> +    VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
> +      VixDiskLib_FreeBlockList (block_list);
> +    VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
> +  }
> +  if (err != VIX_OK) {
> +    char *errmsg = VixDiskLib_GetErrorText (err, NULL);
> +    nbdkit_debug ("can_extents: "
> +                  "VixDiskLib_QueryAllocatedBlocks test failed, "
> +                  "extents support will be disabled: "
> +                  "original error: %s",
> +                  errmsg);
> +    VixDiskLib_FreeErrorText (errmsg);
> +    return 0;
> +  }
> +
> +  return 1;
> +}
> +
> +/* Add an extent to the list of extents. */
> +static int
> +add_extent (struct nbdkit_extents *extents,
> +            uint64_t *position, uint64_t next_position, bool is_hole)
> +{
> +  uint32_t type = 0;
> +  const uint64_t length = next_position - *position;
> +
> +  if (is_hole) {
> +    type = NBDKIT_EXTENT_HOLE;
> +    /* Images opened as single link might be backed by another file in the
> +       chain, so the holes are not guaranteed to be zeroes. */
> +    if (!single_link)
> +      type |= NBDKIT_EXTENT_ZERO;
> +  }
> +
> +  assert (*position <= next_position);
> +  if (*position == next_position)
> +    return 0;
> +
> +  if (vddk_debug_extents)
> +    nbdkit_debug ("adding extent type %s at [%" PRIu64 "...%" PRIu64 "]",
> +                  is_hole ? "hole" : "allocated data",
> +                  *position, next_position-1);
> +  if (nbdkit_add_extent (extents, *position, length, type) == -1)
> +    return -1;
> +
> +  *position = next_position;
> +  return 0;
> +}
> +
> +static int
> +do_extents (struct command *cmd, struct vddk_handle *h)
> +{
> +  uint32_t count = cmd->count;
> +  uint64_t offset = cmd->offset;
> +  bool req_one = cmd->req_one;
> +  struct nbdkit_extents *extents = cmd->ptr;
> +  uint64_t position, end, start_sector;
> +
> +  position = offset;
> +  end = offset + count;
> +
> +  /* We can only query whole chunks.  Therefore start with the
> +   * first chunk before offset.
> +   */
> +  start_sector =
> +    ROUND_DOWN (offset, VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
> +    / VIXDISKLIB_SECTOR_SIZE;
> +  while (start_sector * VIXDISKLIB_SECTOR_SIZE < end) {
> +    VixError err;
> +    uint32_t i;
> +    uint64_t nr_chunks, nr_sectors;
> +    VixDiskLibBlockList *block_list;
> +
> +    assert (IS_ALIGNED (start_sector, VIXDISKLIB_MIN_CHUNK_SIZE));
> +
> +    nr_chunks =
> +      ROUND_UP (end - start_sector * VIXDISKLIB_SECTOR_SIZE,
> +                VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
> +      / (VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE);
> +    nr_chunks = MIN (nr_chunks, VIXDISKLIB_MAX_CHUNK_NUMBER);
> +    nr_sectors = nr_chunks * VIXDISKLIB_MIN_CHUNK_SIZE;
> +
> +    VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
> +                     "handle, %" PRIu64 " sectors, %" PRIu64 " sectors, "
> +                     "%d sectors",
> +                     start_sector, nr_sectors, VIXDISKLIB_MIN_CHUNK_SIZE)
> +      err = VixDiskLib_QueryAllocatedBlocks (h->handle,
> +                                             start_sector, nr_sectors,
> +                                             VIXDISKLIB_MIN_CHUNK_SIZE,
> +                                             &block_list);
> +    VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
> +    if (err != VIX_OK) {
> +      VDDK_ERROR (err, "VixDiskLib_QueryAllocatedBlocks");
> +      return -1;
> +    }
> +
> +    for (i = 0; i < block_list->numBlocks; ++i) {
> +      uint64_t blk_offset, blk_length;
> +
> +      blk_offset = block_list->blocks[i].offset * VIXDISKLIB_SECTOR_SIZE;
> +      blk_length = block_list->blocks[i].length * VIXDISKLIB_SECTOR_SIZE;
> +
> +      /* The query returns allocated blocks.  We must insert holes
> +       * between the blocks as necessary.
> +       */
> +      if ((position < blk_offset &&
> +           add_extent (extents, &position, blk_offset, true) == -1) ||
> +          (add_extent (extents,
> +                       &position, blk_offset + blk_length, false) == -1)) {
> +        VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
> +          VixDiskLib_FreeBlockList (block_list);
> +        VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
> +        return -1;
> +      }
> +    }
> +    VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
> +      VixDiskLib_FreeBlockList (block_list);
> +    VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
> +
> +    /* There's an implicit hole after the returned list of blocks,
> +     * up to the end of the QueryAllocatedBlocks request.
> +     */
> +    if (add_extent (extents,
> +                    &position,
> +                    (start_sector + nr_sectors) * VIXDISKLIB_SECTOR_SIZE,
> +                    true) == -1) {
> +      return -1;
> +    }
> +
> +    start_sector += nr_sectors;
> +
> +    /* If one extent was requested, as long as we've added an extent
> +     * overlapping the original offset we're done.
> +     */
> +    if (req_one && position > offset)
> +      break;
> +  }
> +
> +  return 0;
> +}
> +
> +/* Background worker thread, one per connection, which is where the
> + * VDDK commands are issued.
> + */
> +void *
> +vddk_worker_thread (void *handle)
> +{
> +  struct vddk_handle *h = handle;
> +  bool stop = false;
> +
> +  while (!stop) {
> +    struct command *cmd;
> +    int r;
> +    bool async = false;
> +
> +    /* Wait until we are sent at least one command. */
> +    {
> +      ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->commands_lock);
> +      while (h->commands.size == 0)
> +        pthread_cond_wait (&h->commands_cond, &h->commands_lock);
> +      cmd = h->commands.ptr[0];
> +      command_queue_remove (&h->commands, 0);
> +    }
> +
> +    switch (cmd->type) {
> +    case STOP:
> +      r = do_stop (cmd, h);
> +      stop = true;
> +      break;
> +
> +    case GET_SIZE: {
> +      int64_t size = do_get_size (cmd, h);
> +      if (size == -1)
> +        r = -1;
> +      else {
> +        r = 0;
> +        *(uint64_t *)cmd->ptr = size;
> +      }
> +      break;
> +    }
> +
> +    case READ:
> +      r = do_read (cmd, h);
> +      /* If async is true, don't retire this command now. */
> +      async = r == 0;
> +      break;
> +
> +    case WRITE:
> +      r = do_write (cmd, h);
> +      /* If async is true, don't retire this command now. */
> +      async = r == 0;
> +      break;
> +
> +    case FLUSH:
> +      r = do_flush (cmd, h);
> +      break;
> +
> +    case CAN_EXTENTS:
> +      r = do_can_extents (cmd, h);
> +      if (r >= 0)
> +        *(int *)cmd->ptr = r;
> +      break;
> +
> +    case EXTENTS:
> +      r = do_extents (cmd, h);
> +      break;
> +
> +    default: abort (); /* impossible, but keeps GCC happy */
> +    } /* switch */
> +
> +    if (!async) {
> +      /* Update the command status. */
> +      ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
> +      cmd->status = r >= 0 ? SUCCEEDED : FAILED;
> +
> +      /* For synchronous commands signal the caller thread that the
> +       * command has completed.  (Asynchronous commands are completed in
> +       * the callback handler).
> +       */
> +      pthread_cond_signal (&cmd->cond);
> +    }
> +  } /* while (!stop) */
> +
> +  /* Exit the worker thread. */
> +  return NULL;
> +}
> diff --git a/tests/dummy-vddk.c b/tests/dummy-vddk.c
> index cb88380c7..b6f120427 100644
> --- a/tests/dummy-vddk.c
> +++ b/tests/dummy-vddk.c
> @@ -188,6 +188,19 @@ VixDiskLib_Read (VixDiskLibHandle handle,
>    return VIX_OK;
>  }
>  
> +NBDKIT_DLL_PUBLIC VixError
> +VixDiskLib_ReadAsync (VixDiskLibHandle handle,
> +                      uint64_t start_sector, uint64_t nr_sectors,
> +                      unsigned char *buf,
> +                      VixDiskLibCompletionCB callback, void *data)
> +{
> +  size_t offset = start_sector * VIXDISKLIB_SECTOR_SIZE;
> +
> +  memcpy (buf, disk + offset, nr_sectors * VIXDISKLIB_SECTOR_SIZE);
> +  callback (data, VIX_OK);
> +  return VIX_ASYNC;
> +}
> +
>  NBDKIT_DLL_PUBLIC VixError
>  VixDiskLib_Write (VixDiskLibHandle handle,
>                    uint64_t start_sector, uint64_t nr_sectors,
> @@ -199,6 +212,25 @@ VixDiskLib_Write (VixDiskLibHandle handle,
>    return VIX_OK;
>  }
>  
> +NBDKIT_DLL_PUBLIC VixError
> +VixDiskLib_WriteAsync (VixDiskLibHandle handle,
> +                       uint64_t start_sector, uint64_t nr_sectors,
> +                       const unsigned char *buf,
> +                       VixDiskLibCompletionCB callback, void *data)
> +{
> +  size_t offset = start_sector * VIXDISKLIB_SECTOR_SIZE;
> +
> +  memcpy (disk + offset, buf, nr_sectors * VIXDISKLIB_SECTOR_SIZE);
> +  callback (data, VIX_OK);
> +  return VIX_ASYNC;
> +}
> +
> +NBDKIT_DLL_PUBLIC VixError
> +VixDiskLib_Flush (VixDiskLibHandle handle)
> +{
> +  return VIX_OK;
> +}
> +
>  NBDKIT_DLL_PUBLIC VixError
>  VixDiskLib_Wait (VixDiskLibHandle handle)
>  {
> 

_______________________________________________
Libguestfs mailing list
[email protected]
https://listman.redhat.com/mailman/listinfo/libguestfs

Reply via email to