On Mon, Nov 27, 2023 at 05:26:00PM -0300, Fabiano Rosas wrote:
> Currently multifd does not need to have knowledge of pages on the
> receiving side because all the information needed is within the
> packets that come in the stream.
> 
> We're about to add support to fixed-ram migration, which cannot use
> packets because it expects the ramblock section in the migration file
> to contain only the guest pages data.
> 
> Add a data structure to transfer pages between the ram migration code
> and the multifd receiving threads.
> 
> We don't want to reuse MultiFDPages_t for two reasons:
> 
> a) multifd threads don't really need to know about the data they're
>    receiving.
> 
> b) the receiving side has to be stopped to load the pages, which means
>    we can experiment with larger granularities than page size when
>    transferring data.
> 
> Signed-off-by: Fabiano Rosas <faro...@suse.de>
> ---
> - stopped using MultiFDPages_t and added a new structure which can
>   take offset + size
> ---
>  migration/multifd.c | 122 ++++++++++++++++++++++++++++++++++++++++++--
>  migration/multifd.h |  20 ++++++++
>  2 files changed, 138 insertions(+), 4 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index c1381bdc21..7dfab2367a 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -142,17 +142,36 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p)
>  static int nocomp_recv_data(MultiFDRecvParams *p, Error **errp)
>  {
>      uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> +    ERRP_GUARD();
>  
>      if (flags != MULTIFD_FLAG_NOCOMP) {
>          error_setg(errp, "multifd %u: flags received %x flags expected %x",
>                     p->id, flags, MULTIFD_FLAG_NOCOMP);
>          return -1;
>      }
> -    for (int i = 0; i < p->normal_num; i++) {
> -        p->iov[i].iov_base = p->host + p->normal[i];
> -        p->iov[i].iov_len = p->page_size;
> +
> +    if (!migrate_multifd_packets()) {
> +        MultiFDRecvData *data = p->data;
> +        size_t ret;
> +
> +        ret = qio_channel_pread(p->c, (char *) data->opaque,
> +                                data->size, data->file_offset, errp);
> +        if (ret != data->size) {
> +            error_prepend(errp,
> +                          "multifd recv (%u): read 0x%zx, expected 0x%zx",
> +                          p->id, ret, data->size);
> +            return -1;
> +        }
> +
> +        return 0;
> +    } else {
> +        for (int i = 0; i < p->normal_num; i++) {
> +            p->iov[i].iov_base = p->host + p->normal[i];
> +            p->iov[i].iov_len = p->page_size;
> +        }
> +
> +        return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
>      }

I guess you managed to squash the file loads into "no compression" handler
of multifd, but IMHO it's not as clean.

Firstly, if to do so, we'd better make sure multifd-compression is not
enabled anywhere together with fixed-ram.  I didn't yet see such protection
in the series.  I think if it happens we should expect crashes because
they'll go into zlib/zstd paths for the file.

IMHO the only model fixed-ram can share with multifd is the task management
part, mutexes, semaphores, etc..  IIRC I used to mention that it'll be nice
if we have simply a pool of threads so we can enqueue tasks.  If that's too
far away, would something like below closer to that?  What I'm thinking:

  - patch 1: rename MultiFDMethods to MultiFDCompressMethods, this can
    replace the other patch to do s/recv_pages/recv_data/

  - patch 2: introduce MultiFDMethods (on top of MultiFDCompressMethods),
    refactor the current code to provide the socket version of MultiFDMethods.

  - patch 3: add the fixed-ram "file" version of MultiFDMethods

MultiFDCompressMethods doesn't need to be used at all for "file" version of
MultiFDMethods.

Would that work?

> -    return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
>  }
>  
>  static MultiFDMethods multifd_nocomp_ops = {
> @@ -989,6 +1008,7 @@ int multifd_save_setup(Error **errp)
>  
>  struct {
>      MultiFDRecvParams *params;
> +    MultiFDRecvData *data;

(If above would work, maybe we can split MultiFDRecvParams into two chunks,
 one commonly used for both, one only for sockets?)

>      /* number of created threads */
>      int count;
>      /* syncs main thread and channels */
> @@ -999,6 +1019,49 @@ struct {
>      MultiFDMethods *ops;
>  } *multifd_recv_state;
>  
> +int multifd_recv(void)
> +{
> +    int i;
> +    static int next_recv_channel;
> +    MultiFDRecvParams *p = NULL;
> +    MultiFDRecvData *data = multifd_recv_state->data;
> +
> +    /*
> +     * next_channel can remain from a previous migration that was
> +     * using more channels, so ensure it doesn't overflow if the
> +     * limit is lower now.
> +     */
> +    next_recv_channel %= migrate_multifd_channels();
> +    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +        p = &multifd_recv_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        if (p->quit) {
> +            error_report("%s: channel %d has already quit!", __func__, i);
> +            qemu_mutex_unlock(&p->mutex);
> +            return -1;
> +        }
> +        if (!p->pending_job) {
> +            p->pending_job++;
> +            next_recv_channel = (i + 1) % migrate_multifd_channels();
> +            break;
> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    assert(p->data->size == 0);
> +    multifd_recv_state->data = p->data;
> +    p->data = data;
> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +
> +    return 1;
> +}

PS: so if we have the pool model we can already mostly merge above code
with multifd_send_pages().. because this will be a common helper to enqueue
a task to a pool, no matter it's for writting (to file/socket) or reading
(only from file).

> +
> +MultiFDRecvData *multifd_get_recv_data(void)
> +{
> +    return multifd_recv_state->data;
> +}
> +
>  static void multifd_recv_terminate_threads(Error *err)
>  {
>      int i;
> @@ -1020,6 +1083,7 @@ static void multifd_recv_terminate_threads(Error *err)
>  
>          qemu_mutex_lock(&p->mutex);
>          p->quit = true;
> +        qemu_sem_post(&p->sem);
>          /*
>           * We could arrive here for two reasons:
>           *  - normal quit, i.e. everything went fine, just finished
> @@ -1069,6 +1133,7 @@ void multifd_load_cleanup(void)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem_sync);
> +        qemu_sem_destroy(&p->sem);
>          g_free(p->name);
>          p->name = NULL;
>          p->packet_len = 0;
> @@ -1083,6 +1148,8 @@ void multifd_load_cleanup(void)
>      qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> +    g_free(multifd_recv_state->data);
> +    multifd_recv_state->data = NULL;
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  }
> @@ -1094,6 +1161,21 @@ void multifd_recv_sync_main(void)
>      if (!migrate_multifd() || !migrate_multifd_packets()) {

[1]

>          return;
>      }
> +
> +    if (!migrate_multifd_packets()) {

Hmm, isn't this checked already above at [1]?  Could this path ever trigger
then?  Maybe we need to drop the one at [1]?

IIUC what you wanted to do here is relying on the last RAM_SAVE_FLAG_EOS in
the image file to do a full flush to make sure all pages are loaded.

You may want to be careful on the side effect of flush_after_each_section
parameter:

        case RAM_SAVE_FLAG_EOS:
            /* normal exit */
            if (migrate_multifd() &&
                migrate_multifd_flush_after_each_section()) {
                multifd_recv_sync_main();
            }

You may want to flush always for file?

> +        for (i = 0; i < migrate_multifd_channels(); i++) {
> +            MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +            qemu_sem_post(&p->sem);
> +            qemu_sem_wait(&p->sem_sync);
> +
> +            qemu_mutex_lock(&p->mutex);
> +            assert(!p->pending_job || p->quit);
> +            qemu_mutex_unlock(&p->mutex);
> +        }
> +        return;

Btw, how does this kick off all the recv threads?  Is it because you did a
sem_post(&sem) with p->pending_job==false this time?

Maybe it's clearer to just set p->quit (or a global quite knob) somewhere?
That'll be clear that this is a one-shot thing, only needed at the end of
the file incoming migration.

> +    }
> +
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -1156,6 +1238,18 @@ static void *multifd_recv_thread(void *opaque)
>  
>              p->total_normal_pages += p->normal_num;
>              has_data = !!p->normal_num;
> +        } else {
> +            /*
> +             * No packets, so we need to wait for the vmstate code to
> +             * give us work.
> +             */
> +            qemu_sem_wait(&p->sem);
> +            qemu_mutex_lock(&p->mutex);
> +            if (!p->pending_job) {
> +                qemu_mutex_unlock(&p->mutex);
> +                break;
> +            }
> +            has_data = !!p->data->size;
>          }
>  
>          qemu_mutex_unlock(&p->mutex);
> @@ -1171,6 +1265,17 @@ static void *multifd_recv_thread(void *opaque)
>              qemu_sem_post(&multifd_recv_state->sem_sync);
>              qemu_sem_wait(&p->sem_sync);
>          }
> +
> +        if (!use_packets) {
> +            qemu_mutex_lock(&p->mutex);
> +            p->data->size = 0;
> +            p->pending_job--;
> +            qemu_mutex_unlock(&p->mutex);
> +        }
> +    }
> +
> +    if (!use_packets) {
> +        qemu_sem_post(&p->sem_sync);

Currently sem_sync is only posted with MULTIFD_FLAG_SYNC flag.  We'd better
be careful on reusing it.

Maybe add some comment above recv_state->sem_sync?

  /*
   * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
   *
   * For files: this is only posted at the end of the file load to mark
   *            completion of the load process.
   */

>      }
>  
>      if (local_err) {
> @@ -1205,6 +1310,10 @@ int multifd_load_setup(Error **errp)
>      thread_count = migrate_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> +
> +    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
> +    multifd_recv_state->data->size = 0;
> +
>      qatomic_set(&multifd_recv_state->count, 0);
>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);
>      multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
> @@ -1214,9 +1323,14 @@ int multifd_load_setup(Error **errp)
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem_sync, 0);
> +        qemu_sem_init(&p->sem, 0);
>          p->quit = false;
> +        p->pending_job = 0;
>          p->id = i;
>  
> +        p->data = g_new0(MultiFDRecvData, 1);
> +        p->data->size = 0;
> +
>          if (use_packets) {
>              p->packet_len = sizeof(MultiFDPacket_t)
>                  + sizeof(uint64_t) * page_count;
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 406d42dbae..abaf16c3f2 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +typedef struct MultiFDRecvData MultiFDRecvData;
> +
>  int multifd_save_setup(Error **errp);
>  void multifd_save_cleanup(void);
>  int multifd_load_setup(Error **errp);
> @@ -24,6 +26,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error 
> **errp);
>  void multifd_recv_sync_main(void);
>  int multifd_send_sync_main(QEMUFile *f);
>  int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
> +int multifd_recv(void);
> +MultiFDRecvData *multifd_get_recv_data(void);
>  
>  /* Multifd Compression flags */
>  #define MULTIFD_FLAG_SYNC (1 << 0)
> @@ -66,6 +70,13 @@ typedef struct {
>      RAMBlock *block;
>  } MultiFDPages_t;
>  
> +struct MultiFDRecvData {
> +    void *opaque;
> +    size_t size;
> +    /* for preadv */
> +    off_t file_offset;
> +};
> +
>  typedef struct {
>      /* Fields are only written at creating/deletion time */
>      /* No lock required for them, they are read only */
> @@ -156,6 +167,8 @@ typedef struct {
>  
>      /* syncs main thread and channels */
>      QemuSemaphore sem_sync;
> +    /* sem where to wait for more work */
> +    QemuSemaphore sem;
>  
>      /* this mutex protects the following parameters */
>      QemuMutex mutex;
> @@ -167,6 +180,13 @@ typedef struct {
>      uint32_t flags;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    int pending_job;
> +    /*
> +     * The owner of 'data' depends of 'pending_job' value:
> +     * pending_job == 0 -> migration_thread can use it.
> +     * pending_job != 0 -> multifd_channel can use it.
> +     */
> +    MultiFDRecvData *data;

Right after the main thread assigns a chunk of memory to load for a recv
thread, the main thread job done, afaict.  I don't see how a race could
happen here.

I'm not sure, but I _think_ if we rely on p->quite or something similar to
quite all recv threads, then this can be dropped?

>  
>      /* thread local variables. No locking required */
>  
> -- 
> 2.35.3
> 

-- 
Peter Xu


Reply via email to