* Het Gala (het.g...@nutanix.com) wrote: > i) Dynamically decide appropriate source and destination ip pairs for the > corresponding multi-FD channel to be connected. > > ii) Removed the support for setting the number of multi-fd channels from qmp > commands. As now all multiFD parameters will be passed via qmp: migrate > command or incoming flag itself.
We can't do that, because it's part of the API already; what you'll need to do is check that the number of entries in your list corresponds to the value set there and error if it's different. Dave > Suggested-by: Manish Mishra <manish.mis...@nutanix.com> > Signed-off-by: Het Gala <het.g...@nutanix.com> > --- > migration/migration.c | 15 --------------- > migration/migration.h | 1 - > migration/multifd.c | 42 +++++++++++++++++++++--------------------- > migration/socket.c | 42 +++++++++++++++++++++++++++++++++--------- > migration/socket.h | 4 +++- > monitor/hmp-cmds.c | 4 ---- > qapi/migration.json | 6 ------ > 7 files changed, 57 insertions(+), 57 deletions(-) > > diff --git a/migration/migration.c b/migration/migration.c > index 9b0ad732e7..57dd4494b4 100644 > --- a/migration/migration.c > +++ b/migration/migration.c > @@ -1585,9 +1585,6 @@ static void > migrate_params_test_apply(MigrateSetParameters *params, > if (params->has_block_incremental) { > dest->block_incremental = params->block_incremental; > } > - if (params->has_multifd_channels) { > - dest->multifd_channels = params->multifd_channels; > - } > if (params->has_multifd_compression) { > dest->multifd_compression = params->multifd_compression; > } > @@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters > *params, Error **errp) > if (params->has_block_incremental) { > s->parameters.block_incremental = params->block_incremental; > } > - if (params->has_multifd_channels) { > - s->parameters.multifd_channels = params->multifd_channels; > - } > if (params->has_multifd_compression) { > s->parameters.multifd_compression = params->multifd_compression; > } > @@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void) > MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER]; > } > > -int migrate_multifd_channels(void) > -{ > - MigrationState *s; > - > - s = migrate_get_current(); > - > - return s->parameters.multifd_channels; > -} > - > MultiFDCompression migrate_multifd_compression(void) > { > MigrationState *s; > diff --git a/migration/migration.h b/migration/migration.h > index fa8717ec9e..9464de8ef7 100644 > --- a/migration/migration.h > +++ b/migration/migration.h > @@ -372,7 +372,6 @@ bool migrate_validate_uuid(void); > bool migrate_auto_converge(void); > bool migrate_use_multifd(void); > bool migrate_pause_before_switchover(void); > -int migrate_multifd_channels(void); > MultiFDCompression migrate_multifd_compression(void); > int migrate_multifd_zlib_level(void); > int migrate_multifd_zstd_level(void); > diff --git a/migration/multifd.c b/migration/multifd.c > index 9282ab6aa4..ce017436fb 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, > Error **errp) > return -1; > } > > - if (msg.id > migrate_multifd_channels()) { > + if (msg.id > total_multifd_channels()) { > error_setg(errp, "multifd: received channel version %u " > "expected %u", msg.version, MULTIFD_VERSION); > return -1; > @@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f) > * using more channels, so ensure it doesn't overflow if the > * limit is lower now. > */ > - next_channel %= migrate_multifd_channels(); > - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { > + next_channel %= total_multifd_channels(); > + for (i = next_channel;; i = (i + 1) % total_multifd_channels()) { > p = &multifd_send_state->params[i]; > > qemu_mutex_lock(&p->mutex); > @@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f) > } > if (!p->pending_job) { > p->pending_job++; > - next_channel = (i + 1) % migrate_multifd_channels(); > + next_channel = (i + 1) % total_multifd_channels(); > break; > } > qemu_mutex_unlock(&p->mutex); > @@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err) > return; > } > > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > qemu_mutex_lock(&p->mutex); > @@ -521,14 +521,14 @@ void multifd_save_cleanup(void) > return; > } > multifd_send_terminate_threads(NULL); > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > if (p->running) { > qemu_thread_join(&p->thread); > } > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > Error *local_err = NULL; > > @@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f) > > flush_zero_copy = migrate_use_zero_copy_send(); > > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > trace_multifd_send_sync_main_signal(p->id); > @@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f) > } > } > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > trace_multifd_send_sync_main_wait(p->id); > @@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp) > int thread_count; > uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); > uint8_t i; > - > + int idx; > if (!migrate_use_multifd()) { > return 0; > } > @@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp) > return -1; > } > > - thread_count = migrate_multifd_channels(); > + thread_count = total_multifd_channels(); > multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); > multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); > multifd_send_state->pages = multifd_pages_init(page_count); > @@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp) > } else { > p->write_flags = 0; > } > - > - socket_send_channel_create(multifd_new_send_channel_async, p); > + idx = multifd_index(i); > + socket_send_channel_create(multifd_new_send_channel_async, p, idx); > } > > for (i = 0; i < thread_count; i++) { > @@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err) > } > } > > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > qemu_mutex_lock(&p->mutex); > @@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp) > return 0; > } > multifd_recv_terminate_threads(NULL); > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > if (p->running) { > @@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp) > qemu_thread_join(&p->thread); > } > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > migration_ioc_unregister_yank(p->c); > @@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void) > if (!migrate_use_multifd()) { > return; > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > trace_multifd_recv_sync_main_wait(p->id); > qemu_sem_wait(&multifd_recv_state->sem_sync); > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > WITH_QEMU_LOCK_GUARD(&p->mutex) { > @@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp) > error_setg(errp, "multifd is not supported by current protocol"); > return -1; > } > - thread_count = migrate_multifd_channels(); > + thread_count = total_multifd_channels(); > multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); > multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); > qatomic_set(&multifd_recv_state->count, 0); > @@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp) > > bool multifd_recv_all_channels_created(void) > { > - int thread_count = migrate_multifd_channels(); > + int thread_count = total_multifd_channels(); > > if (!migrate_use_multifd()) { > return true; > @@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error > **errp) > QEMU_THREAD_JOINABLE); > qatomic_inc(&multifd_recv_state->count); > return qatomic_read(&multifd_recv_state->count) == > - migrate_multifd_channels(); > + total_multifd_channels(); > } > diff --git a/migration/socket.c b/migration/socket.c > index d0cb7cc6a6..c0ac6dbbe2 100644 > --- a/migration/socket.c > +++ b/migration/socket.c > @@ -28,9 +28,6 @@ > #include "trace.h" > > > -struct SocketOutgoingArgs { > - SocketAddress *saddr; > -} outgoing_args; > > struct SocketArgs { > struct SrcDestAddr data; > @@ -43,20 +40,47 @@ struct OutgoingMigrateParams { > uint64_t total_multifd_channel; > } outgoing_migrate_params; > > -void socket_send_channel_create(QIOTaskFunc f, void *data) > + > +int total_multifd_channels(void) > +{ > + return outgoing_migrate_params.total_multifd_channel; > +} > + > +int multifd_index(int i) > +{ > + int length = outgoing_migrate_params.length; > + int j = 0; > + int runn_sum = 0; > + while (j < length) { > + runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels; > + if (i >= runn_sum) { > + j++; > + } else { > + break; > + } > + } > + return j; > +} > + > +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx) > { > QIOChannelSocket *sioc = qio_channel_socket_new(); > - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, > - f, data, NULL, NULL, NULL); > + qio_channel_socket_connect_async(sioc, > + > outgoing_migrate_params.socket_args[idx].data.dst_addr, > + f, data, NULL, NULL, > + > outgoing_migrate_params.socket_args[idx].data.src_addr); > } > > int socket_send_channel_destroy(QIOChannel *send) > { > /* Remove channel */ > object_unref(OBJECT(send)); > - if (outgoing_args.saddr) { > - qapi_free_SocketAddress(outgoing_args.saddr); > - outgoing_args.saddr = NULL; > + if (outgoing_migrate_params.socket_args != NULL) { > + g_free(outgoing_migrate_params.socket_args); > + outgoing_migrate_params.socket_args = NULL; > + } > + if (outgoing_migrate_params.length) { > + outgoing_migrate_params.length = 0; > } > > if (outgoing_migrate_params.socket_args != NULL) { > diff --git a/migration/socket.h b/migration/socket.h > index b9e3699167..c8b9252384 100644 > --- a/migration/socket.h > +++ b/migration/socket.h > @@ -27,7 +27,9 @@ struct SrcDestAddr { > SocketAddress *src_addr; > }; > > -void socket_send_channel_create(QIOTaskFunc f, void *data); > +int total_multifd_channels(void); > +int multifd_index(int i); > +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx); > int socket_send_channel_destroy(QIOChannel *send); > > void socket_start_incoming_migration(const char *str, uint8_t number, > diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c > index 32a6b67d5f..9a3d76d6ba 100644 > --- a/monitor/hmp-cmds.c > +++ b/monitor/hmp-cmds.c > @@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const > QDict *qdict) > p->has_block_incremental = true; > visit_type_bool(v, param, &p->block_incremental, &err); > break; > - case MIGRATION_PARAMETER_MULTIFD_CHANNELS: > - p->has_multifd_channels = true; > - visit_type_uint8(v, param, &p->multifd_channels, &err); > - break; > case MIGRATION_PARAMETER_MULTIFD_COMPRESSION: > p->has_multifd_compression = true; > visit_type_MultiFDCompression(v, param, &p->multifd_compression, > diff --git a/qapi/migration.json b/qapi/migration.json > index 62a7b22d19..1b1c6d01d3 100644 > --- a/qapi/migration.json > +++ b/qapi/migration.json > @@ -877,11 +877,6 @@ > # migrated and the destination must already have access > to the > # same backing chain as was used on the source. (since > 2.10) > # > -# @multifd-channels: Number of channels used to migrate data in > -# parallel. This is the same number that the > -# number of sockets used for migration. The > -# default value is 2 (since 4.0) > -# > # @xbzrle-cache-size: cache size to be used by XBZRLE migration. It > # needs to be a multiple of the target page size > # and a power of 2 > @@ -965,7 +960,6 @@ > '*x-checkpoint-delay': { 'type': 'uint32', > 'features': [ 'unstable' ] }, > '*block-incremental': 'bool', > - '*multifd-channels': 'uint8', > '*xbzrle-cache-size': 'size', > '*max-postcopy-bandwidth': 'size', > '*max-cpu-throttle': 'uint8', > -- > 2.22.3 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK