Re: [Qemu-devel] [PATCH v5 07/17] migration: Create multifd migration threads

2017-08-08 Thread Juan Quintela
"Dr. David Alan Gilbert"  wrote:
> * Juan Quintela (quint...@redhat.com) wrote:
>> Creation of the threads, nothing inside yet.
>> 
>> Signed-off-by: Juan Quintela 

>> +MultiFDSendParams *p = &multifd_send_state->params[i];
>> +
>> +qemu_mutex_lock(&p->mutex);
>> +p->quit = true;
>> +qemu_sem_post(&p->sem);
>> +qemu_mutex_unlock(&p->mutex);
>
> I don't think you need that lock/unlock pair - as long as no one
> else is currently going around setting them to false; so as long
> as you know you're safely after initialisation and no one is trying
> to start a new migration at the moment then I think it's safe.

It is the error path, or the end of migration.  I get very nervous with:
*I think  that it is safe*, and specially, then I lost "moral
authority" when somebody else tries *not* to protect some access to a
variable.

If you prefer it, I can change that to one atomic, but I am not sure
that it would make a big difference.

And yes, in this case it is probably OK, because the sem_post() should
synchronize everything needed, but I am not one expert in all
architectures, better sorry than safe, no?

>> +}
>> +}
>> +
>> +void multifd_save_cleanup(void)
>> +{
>> +int i;
>> +
>> +if (!migrate_use_multifd()) {
>> +return;
>> +}
>> +terminate_multifd_send_threads();
>> +for (i = 0; i < multifd_send_state->count; i++) {
>> +MultiFDSendParams *p = &multifd_send_state->params[i];
>> +
>> +qemu_thread_join(&p->thread);
>> +qemu_mutex_destroy(&p->mutex);
>> +qemu_sem_destroy(&p->sem);
>> +}
>> +g_free(multifd_send_state->params);
>> +multifd_send_state->params = NULL;
>> +g_free(multifd_send_state);
>> +multifd_send_state = NULL;
>
> I'd be tempted to add a few traces around here, and also some
> protection against it being called twice.  Maybe it shouldn't
> happen, but it would be nice to debug it when it does.

I can change like I do on the reception side, As it is an array of
pointers, I can easily make them point to NULL.  What do you think?

>
>> +}
>> +
>> +static void *multifd_send_thread(void *opaque)
>> +{
>> +MultiFDSendParams *p = opaque;
>> +
>> +while (true) {
>> +qemu_mutex_lock(&p->mutex);
>> +if (p->quit) {
>> +qemu_mutex_unlock(&p->mutex);
>> +break;
>> +}
>> +qemu_mutex_unlock(&p->mutex);
>> +qemu_sem_wait(&p->sem);
>
> Similar to above, I don't think you need those
> locks around the quit check.

For POSIX it is not strictly needed:



 Applications shall ensure that access to any memory location by more
 than one thread of control (threads or processes) is restricted such
 that no thread of control can read or modify a memory location while
 another thread of control may be modifying it. Such access is restricted
 using functions that synchronize thread execution and also synchronize
 memory with respect to other threads. The following functions
 synchronize memory with respect to other threads:

 fork() pthread_barrier_wait() pthread_cond_broadcast()
 pthread_cond_signal() pthread_cond_timedwait() pthread_cond_wait()
 pthread_create() pthread_join() pthread_mutex_lock()
 pthread_mutex_timedlock()

 pthread_mutex_trylock() pthread_mutex_unlock() pthread_spin_lock()
 pthread_spin_trylock() pthread_spin_unlock() pthread_rwlock_rdlock()
 pthread_rwlock_timedrdlock() pthread_rwlock_timedwrlock()
 pthread_rwlock_tryrdlock() pthread_rwlock_trywrlock()

 pthread_rwlock_unlock() pthread_rwlock_wrlock() sem_post()
 sem_timedwait() sem_trywait() sem_wait() semctl() semop() wait()
 waitpid()

sem_wait() synchronizes memory, but when we add more code to the mix, we
need to make sure that we call some function that synchronizes memory
there.  My experience is that this gets us into trouble along the road.

Just for starters, I never remember this list of functions from memory,
and secondly, in x86, if you are just assigning a variable, it just
works correctly almost always, so we always get the bugs on other
architectures.


>> +int multifd_load_setup(void)
>> +{
>> +int thread_count;
>> +uint8_t i;
>> +
>> +if (!migrate_use_multifd()) {
>> +return 0;
>> +}
>> +thread_count = migrate_multifd_threads();
>> +multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>> +multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>> +multifd_recv_state->count = 0;
>> +for (i = 0; i < thread_count; i++) {
>> +char thread_name[16];
>> +MultiFDRecvParams *p = &multifd_recv_state->params[i];
>> +
>> +qemu_mutex_init(&p->mutex);
>> +qemu_sem_init(&p->sem, 0);
>> +p->quit = false;
>> +p->id = i;
>> +snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
>> +qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
>> +   QEMU_THREAD_JOINABLE);
>> +multifd_r

Re: [Qemu-devel] [PATCH v5 07/17] migration: Create multifd migration threads

2017-07-19 Thread Dr. David Alan Gilbert
* Juan Quintela (quint...@redhat.com) wrote:
> Creation of the threads, nothing inside yet.
> 
> Signed-off-by: Juan Quintela 
> 
> --
> 
> Use pointers instead of long array names
> Move to use semaphores instead of conditions as paolo suggestion
> 
> Put all the state inside one struct.
> Use a counter for the number of threads created.  Needed during cancellation.
> 
> Add error return to thread creation
> 
> Add id field
> 
> Rename functions to multifd_save/load_setup/cleanup
> ---
>  migration/migration.c |  14 
>  migration/ram.c   | 192 
> ++
>  migration/ram.h   |   5 ++
>  3 files changed, 211 insertions(+)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index ff3fc9d..5a82c1c 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -288,6 +288,7 @@ static void process_incoming_migration_bh(void *opaque)
>  } else {
>  runstate_set(global_state_get_runstate());
>  }
> +multifd_load_cleanup();
>  /*
>   * This must happen after any state changes since as soon as an external
>   * observer sees this event they might start to prod at the VM assuming
> @@ -348,6 +349,7 @@ static void process_incoming_migration_co(void *opaque)
>  migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
>MIGRATION_STATUS_FAILED);
>  error_report("load of migration failed: %s", strerror(-ret));
> +multifd_load_cleanup();
>  exit(EXIT_FAILURE);
>  }
>  mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
> @@ -358,6 +360,11 @@ void migration_fd_process_incoming(QEMUFile *f)
>  {
>  Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);
>  
> +if (multifd_load_setup() != 0) {
> +/* We haven't been able to create multifd threads
> +   nothing better to do */
> +exit(EXIT_FAILURE);
> +}
>  qemu_file_set_blocking(f, false);
>  qemu_coroutine_enter(co);
>  }
> @@ -860,6 +867,7 @@ static void migrate_fd_cleanup(void *opaque)
>  }
>  qemu_mutex_lock_iothread();
>  
> +multifd_save_cleanup();
>  qemu_fclose(s->to_dst_file);
>  s->to_dst_file = NULL;
>  }
> @@ -2049,6 +2057,12 @@ void migrate_fd_connect(MigrationState *s)
>  }
>  }
>  
> +if (multifd_save_setup() != 0) {
> +migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
> +  MIGRATION_STATUS_FAILED);
> +migrate_fd_cleanup(s);
> +return;
> +}
>  qemu_thread_create(&s->thread, "live_migration", migration_thread, s,
> QEMU_THREAD_JOINABLE);
>  s->migration_thread_running = true;
> diff --git a/migration/ram.c b/migration/ram.c
> index 1b08296..8e87533 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -356,6 +356,198 @@ static void compress_threads_save_setup(void)
>  }
>  }
>  
> +/* Multiple fd's */
> +
> +struct MultiFDSendParams {
> +uint8_t id;
> +QemuThread thread;
> +QemuSemaphore sem;
> +QemuMutex mutex;
> +bool quit;
> +};
> +typedef struct MultiFDSendParams MultiFDSendParams;
> +
> +struct {
> +MultiFDSendParams *params;
> +/* number of created threads */
> +int count;
> +} *multifd_send_state;
> +
> +static void terminate_multifd_send_threads(void)
> +{
> +int i;
> +
> +for (i = 0; i < multifd_send_state->count; i++) {
> +MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +qemu_mutex_lock(&p->mutex);
> +p->quit = true;
> +qemu_sem_post(&p->sem);
> +qemu_mutex_unlock(&p->mutex);

I don't think you need that lock/unlock pair - as long as no one
else is currently going around setting them to false; so as long
as you know you're safely after initialisation and no one is trying
to start a new migration at the moment then I think it's safe.

> +}
> +}
> +
> +void multifd_save_cleanup(void)
> +{
> +int i;
> +
> +if (!migrate_use_multifd()) {
> +return;
> +}
> +terminate_multifd_send_threads();
> +for (i = 0; i < multifd_send_state->count; i++) {
> +MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +qemu_thread_join(&p->thread);
> +qemu_mutex_destroy(&p->mutex);
> +qemu_sem_destroy(&p->sem);
> +}
> +g_free(multifd_send_state->params);
> +multifd_send_state->params = NULL;
> +g_free(multifd_send_state);
> +multifd_send_state = NULL;

I'd be tempted to add a few traces around here, and also some
protection against it being called twice.  Maybe it shouldn't
happen, but it would be nice to debug it when it does.

> +}
> +
> +static void *multifd_send_thread(void *opaque)
> +{
> +MultiFDSendParams *p = opaque;
> +
> +while (true) {
> +qemu_mutex_lock(&p->mutex);
> +if (p->quit) {
> +qemu_mutex_unlock(&p->mutex);
> +break;
> +}
> +