On Mon, May 01, 2023 at 05:01:36PM +0300, Avihai Horon wrote:
> Add the core functionality of precopy initial data, which allows the
> destination to ACK that initial data has been loaded and the source to
> wait for this ACK before completing the migration.
> 
> A new return path command MIG_RP_MSG_INITIAL_DATA_LOADED_ACK is added.
> It is sent by the destination after precopy initial data is loaded to
> ACK to the source that precopy initial data has been loaded.
> 
> In addition, two new SaveVMHandlers handlers are added:
> 1. is_initial_data_active which indicates whether precopy initial data
>    is used for this migration user (i.e., SaveStateEntry).
> 2. initial_data_loaded which indicates whether precopy initial data has
>    been loaded by this migration user.
> 
> Signed-off-by: Avihai Horon <avih...@nvidia.com>
> ---
>  include/migration/register.h |  7 ++++++
>  migration/migration.h        | 12 +++++++++++
>  migration/migration.c        | 41 ++++++++++++++++++++++++++++++++++--
>  migration/savevm.c           | 39 ++++++++++++++++++++++++++++++++++
>  migration/trace-events       |  2 ++
>  5 files changed, 99 insertions(+), 2 deletions(-)
> 
> diff --git a/include/migration/register.h b/include/migration/register.h
> index 0a73f3883e..297bbe9f73 100644
> --- a/include/migration/register.h
> +++ b/include/migration/register.h
> @@ -77,6 +77,13 @@ typedef struct SaveVMHandlers {
>       * true if it's supported or false otherwise. Called both in src and 
> dest.
>       */
>      bool (*initial_data_advise)(void *opaque);
> +    /*
> +     * Checks if precopy initial data is active. If it's inactive,
> +     * initial_data_loaded check is skipped.
> +     */
> +    bool (*is_initial_data_active)(void *opaque);
> +    /* Checks if precopy initial data has been loaded in dest */
> +    bool (*initial_data_loaded)(void *opaque);
>  } SaveVMHandlers;
>  
>  int register_savevm_live(const char *idstr,
> diff --git a/migration/migration.h b/migration/migration.h
> index 4f615e9dbc..d865c23d87 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -207,6 +207,11 @@ struct MigrationIncomingState {
>  
>      /* Indicates whether precopy initial data was enabled and should be used 
> */
>      bool initial_data_enabled;
> +    /*
> +     * Indicates whether an ACK that precopy initial data was loaded has been
> +     * sent to source.
> +     */
> +    bool initial_data_loaded_ack_sent;
>  };
>  
>  MigrationIncomingState *migration_incoming_get_current(void);
> @@ -435,6 +440,12 @@ struct MigrationState {
>  
>      /* QEMU_VM_VMDESCRIPTION content filled for all non-iterable devices. */
>      JSONWriter *vmdesc;
> +
> +    /*
> +     * Indicates whether an ACK that precopy initial data was loaded in
> +     * destination has been received.
> +     */
> +    bool initial_data_loaded_acked;
>  };
>  
>  void migrate_set_state(int *state, int old_state, int new_state);
> @@ -475,6 +486,7 @@ int 
> migrate_send_rp_message_req_pages(MigrationIncomingState *mis,
>  void migrate_send_rp_recv_bitmap(MigrationIncomingState *mis,
>                                   char *block_name);
>  void migrate_send_rp_resume_ack(MigrationIncomingState *mis, uint32_t value);
> +int migrate_send_rp_initial_data_loaded_ack(MigrationIncomingState *mis);
>  
>  void dirty_bitmap_mig_before_vm_start(void);
>  void dirty_bitmap_mig_cancel_outgoing(void);
> diff --git a/migration/migration.c b/migration/migration.c
> index 68cdf5b184..304cab2fa1 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -77,6 +77,11 @@ enum mig_rp_message_type {
>      MIG_RP_MSG_RECV_BITMAP,  /* send recved_bitmap back to source */
>      MIG_RP_MSG_RESUME_ACK,   /* tell source that we are ready to resume */
>  
> +    MIG_RP_MSG_INITIAL_DATA_LOADED_ACK, /*
> +                                         * Tell source precopy initial data 
> is
> +                                         * loaded.
> +                                         */
> +
>      MIG_RP_MSG_MAX
>  };
>  
> @@ -756,6 +761,12 @@ bool migration_has_all_channels(void)
>      return true;
>  }
>  
> +int migrate_send_rp_initial_data_loaded_ack(MigrationIncomingState *mis)
> +{
> +    return migrate_send_rp_message(mis, MIG_RP_MSG_INITIAL_DATA_LOADED_ACK, 
> 0,
> +                                   NULL);
> +}
> +
>  /*
>   * Send a 'SHUT' message on the return channel with the given value
>   * to indicate that we've finished with the RP.  Non-0 value indicates
> @@ -1401,6 +1412,8 @@ void migrate_init(MigrationState *s)
>      s->vm_was_running = false;
>      s->iteration_initial_bytes = 0;
>      s->threshold_size = 0;
> +
> +    s->initial_data_loaded_acked = false;
>  }
>  
>  int migrate_add_blocker_internal(Error *reason, Error **errp)
> @@ -1717,6 +1730,9 @@ static struct rp_cmd_args {
>      [MIG_RP_MSG_REQ_PAGES_ID]   = { .len = -1, .name = "REQ_PAGES_ID" },
>      [MIG_RP_MSG_RECV_BITMAP]    = { .len = -1, .name = "RECV_BITMAP" },
>      [MIG_RP_MSG_RESUME_ACK]     = { .len =  4, .name = "RESUME_ACK" },
> +    [MIG_RP_MSG_INITIAL_DATA_LOADED_ACK] = { .len = 0,
> +                                             .name =
> +                                                 "INITIAL_DATA_LOADED_ACK" },
>      [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
>  };
>  
> @@ -1955,6 +1971,11 @@ retry:
>              }
>              break;
>  
> +        case MIG_RP_MSG_INITIAL_DATA_LOADED_ACK:
> +            ms->initial_data_loaded_acked = true;
> +            trace_source_return_path_thread_initial_data_loaded_ack();
> +            break;
> +
>          default:
>              break;
>          }
> @@ -2704,6 +2725,20 @@ static void migration_update_counters(MigrationState 
> *s,
>                                bandwidth, s->threshold_size);
>  }
>  
> +static bool initial_data_loaded_acked(MigrationState *s)
> +{
> +    if (!migrate_precopy_initial_data()) {
> +        return true;
> +    }
> +
> +    /* No reason to wait for precopy initial data loaded ACK if VM is 
> stopped */
> +    if (!runstate_is_running()) {
> +        return true;
> +    }
> +
> +    return s->initial_data_loaded_acked;
> +}
> +
>  /* Migration thread iteration status */
>  typedef enum {
>      MIG_ITERATE_RESUME,         /* Resume current iteration */
> @@ -2719,6 +2754,7 @@ static MigIterateState 
> migration_iteration_run(MigrationState *s)
>  {
>      uint64_t must_precopy, can_postcopy;
>      bool in_postcopy = s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE;
> +    bool initial_data_loaded = initial_data_loaded_acked(s);
>  
>      qemu_savevm_state_pending_estimate(&must_precopy, &can_postcopy);
>      uint64_t pending_size = must_precopy + can_postcopy;
> @@ -2731,7 +2767,8 @@ static MigIterateState 
> migration_iteration_run(MigrationState *s)
>          trace_migrate_pending_exact(pending_size, must_precopy, 
> can_postcopy);
>      }
>  
> -    if (!pending_size || pending_size < s->threshold_size) {
> +    if ((!pending_size || pending_size < s->threshold_size) &&
> +        initial_data_loaded) {
>          trace_migration_thread_low_pending(pending_size);
>          migration_completion(s);
>          return MIG_ITERATE_BREAK;
> @@ -2739,7 +2776,7 @@ static MigIterateState 
> migration_iteration_run(MigrationState *s)
>  
>      /* Still a significant amount to transfer */
>      if (!in_postcopy && must_precopy <= s->threshold_size &&
> -        qatomic_read(&s->start_postcopy)) {
> +        initial_data_loaded && qatomic_read(&s->start_postcopy)) {
>          if (postcopy_start(s)) {
>              error_report("%s: postcopy failed to start", __func__);
>          }
> diff --git a/migration/savevm.c b/migration/savevm.c
> index 2740defdf0..7a94deda3b 100644
> --- a/migration/savevm.c
> +++ b/migration/savevm.c
> @@ -2504,6 +2504,39 @@ static int loadvm_process_command(QEMUFile *f)
>      return 0;
>  }
>  
> +static int qemu_loadvm_initial_data_loaded_ack(MigrationIncomingState *mis)
> +{
> +    SaveStateEntry *se;
> +    int ret;
> +
> +    if (!mis->initial_data_enabled || mis->initial_data_loaded_ack_sent) {
> +        return 0;
> +    }
> +
> +    QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
> +        if (!se->ops || !se->ops->initial_data_loaded) {
> +            continue;
> +        }
> +
> +        if (!se->ops->is_initial_data_active ||
> +            !se->ops->is_initial_data_active(se->opaque)) {
> +            continue;
> +        }
> +
> +        if (!se->ops->initial_data_loaded(se->opaque)) {
> +            return 0;
> +        }
> +    }
> +
> +    ret = migrate_send_rp_initial_data_loaded_ack(mis);
> +    if (!ret) {
> +        mis->initial_data_loaded_ack_sent = true;
> +        trace_loadvm_initial_data_loaded_acked();
> +    }
> +
> +    return ret;
> +}
> +
>  /*
>   * Read a footer off the wire and check that it matches the expected section
>   *
> @@ -2826,6 +2859,12 @@ retry:
>              if (ret < 0) {
>                  goto out;
>              }
> +
> +            ret = qemu_loadvm_initial_data_loaded_ack(mis);
> +            if (ret < 0) {
> +                goto out;
> +            }

This is slightly hacky - it gets called for every END section.

Please consider something cleaner, e.g., feel free to consider a
notification mechanism I mentioned in my reply to the cover letter, so it's
called only if the device is ready for switchover (no matter what interface
it'll use).

Thanks,

> +
>              break;
>          case QEMU_VM_COMMAND:
>              ret = loadvm_process_command(f);
> diff --git a/migration/trace-events b/migration/trace-events
> index 21ae471126..a0e1d3b2fd 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -24,6 +24,7 @@ loadvm_postcopy_ram_handle_discard_header(const char 
> *ramid, uint16_t len) "%s:
>  loadvm_process_command(const char *s, uint16_t len) "com=%s len=%d"
>  loadvm_process_command_ping(uint32_t val) "0x%x"
>  loadvm_handle_initial_data_enable(uint8_t general_enable, const char *idstr, 
> int instance_id) "general_enable=%u, idstr=%s, instance_id=%u"
> +loadvm_initial_data_loaded_acked(void) ""
>  postcopy_ram_listen_thread_exit(void) ""
>  postcopy_ram_listen_thread_start(void) ""
>  qemu_savevm_send_postcopy_advise(void) ""
> @@ -182,6 +183,7 @@ source_return_path_thread_loop_top(void) ""
>  source_return_path_thread_pong(uint32_t val) "0x%x"
>  source_return_path_thread_shut(uint32_t val) "0x%x"
>  source_return_path_thread_resume_ack(uint32_t v) "%"PRIu32
> +source_return_path_thread_initial_data_loaded_ack(void) ""
>  migration_thread_low_pending(uint64_t pending) "%" PRIu64
>  migrate_transferred(uint64_t tranferred, uint64_t time_spent, uint64_t 
> bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " 
> bandwidth %" PRIu64 " max_size %" PRId64
>  process_incoming_migration_co_end(int ret, int ps) "ret=%d postcopy-state=%d"
> -- 
> 2.26.3
> 
> 

-- 
Peter Xu


Reply via email to