Peter Xu <pet...@redhat.com> wrote: > On Wed, Jul 19, 2017 at 08:02:39PM +0100, Dr. David Alan Gilbert wrote: >> * Juan Quintela (quint...@redhat.com) wrote:
>> > struct MultiFDSendParams { >> > + /* not changed */ >> > uint8_t id; >> > QemuThread thread; >> > QIOChannel *c; >> > QemuSemaphore sem; >> > QemuMutex mutex; >> > + /* protected by param mutex */ >> > bool quit; >> >> Should probably comment to say what address space address is in - this >> is really a qemu pointer - and that's why we can treat 0 as special? > > I believe this comment is for "address" below. > > Yes, it would be nice to comment it. IIUC it belongs to virtual > address space of QEMU, so it should be okay to use zero as a "special" > value. See new comments. >> >> > + uint8_t *address; >> > + /* protected by multifd mutex */ >> > + bool done; >> >> done needs a comment to explain what it is because >> it sounds similar to quit; I think 'done' is saying that >> the thread is idle having done what was asked? > > Since we know that valid address won't be zero, not sure whether we > can just avoid introducing the "done" field (even, not sure whether we > will need lock when modifying "address", I think not as well? Please > see below). For what I see this, it works like a state machine, and > "address" can be the state: > > +-------- send thread ---------+ > | | > \|/ | > address==0 (IDLE) address!=0 (ACTIVE) > | /|\ > | | > +-------- main thread ---------+ > > Then... It is needed, we change things later in the series. We could treat as an special case page.num == 0. But then we can differentiate the case where we have finished the last round and that we are in the beggining of the new one. >> >> > }; >> > typedef struct MultiFDSendParams MultiFDSendParams; >> > >> > @@ -375,6 +381,8 @@ struct { >> > MultiFDSendParams *params; >> > /* number of created threads */ >> > int count; >> > + QemuMutex mutex; >> > + QemuSemaphore sem; >> > } *multifd_send_state; >> > >> > static void terminate_multifd_send_threads(void) >> > @@ -443,6 +451,7 @@ static void *multifd_send_thread(void *opaque) >> > } else { >> > qio_channel_write(p->c, string, MULTIFD_UUID_MSG, &error_abort); >> > } >> > + qemu_sem_post(&multifd_send_state->sem); >> > >> > while (!exit) { >> > qemu_mutex_lock(&p->mutex); >> > @@ -450,6 +459,15 @@ static void *multifd_send_thread(void *opaque) >> > qemu_mutex_unlock(&p->mutex); >> > break; >> > } >> > + if (p->address) { >> > + p->address = 0; >> > + qemu_mutex_unlock(&p->mutex); >> > + qemu_mutex_lock(&multifd_send_state->mutex); >> > + p->done = true; >> > + qemu_mutex_unlock(&multifd_send_state->mutex); >> > + qemu_sem_post(&multifd_send_state->sem); >> > + continue; > > Here instead of setting up address=0 at the entry, can we do this (no > "done" for this time)? > > // send the page before clearing p->address > send_page(p->address); > // clear p->address to switch to "IDLE" state > atomic_set(&p->address, 0); > // tell main thread, in case it's waiting > qemu_sem_post(&multifd_send_state->sem); > > And on the main thread... > >> > + } >> > qemu_mutex_unlock(&p->mutex); >> > qemu_sem_wait(&p->sem); >> > } >> > @@ -469,6 +487,8 @@ int multifd_save_setup(void) >> > multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); >> > multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); >> > multifd_send_state->count = 0; >> > + qemu_mutex_init(&multifd_send_state->mutex); >> > + qemu_sem_init(&multifd_send_state->sem, 0); >> > for (i = 0; i < thread_count; i++) { >> > char thread_name[16]; >> > MultiFDSendParams *p = &multifd_send_state->params[i]; >> > @@ -477,6 +497,8 @@ int multifd_save_setup(void) >> > qemu_sem_init(&p->sem, 0); >> > p->quit = false; >> > p->id = i; >> > + p->done = true; >> > + p->address = 0; >> > p->c = socket_send_channel_create(); >> > if (!p->c) { >> > error_report("Error creating a send channel"); >> > @@ -491,6 +513,30 @@ int multifd_save_setup(void) >> > return 0; >> > } >> > >> > +static int multifd_send_page(uint8_t *address) >> > +{ >> > + int i; >> > + MultiFDSendParams *p = NULL; /* make happy gcc */ >> > + > > >> > + qemu_sem_wait(&multifd_send_state->sem); >> > + qemu_mutex_lock(&multifd_send_state->mutex); >> > + for (i = 0; i < multifd_send_state->count; i++) { >> > + p = &multifd_send_state->params[i]; >> > + >> > + if (p->done) { >> > + p->done = false; >> > + break; >> > + } >> > + } >> > + qemu_mutex_unlock(&multifd_send_state->mutex); >> > + qemu_mutex_lock(&p->mutex); >> > + p->address = address; >> > + qemu_mutex_unlock(&p->mutex); >> > + qemu_sem_post(&p->sem); > > ... here can we just do this? > > retry: > // don't take any lock, only read each p->address > for (i = 0; i < multifd_send_state->count; i++) { > p = &multifd_send_state->params[i]; > if (!p->address) { > // we found one IDLE send thread > break; > } > } > if (!p) { > qemu_sem_wait(&multifd_send_state->sem); > goto retry; > } > // we switch its state, IDLE -> ACTIVE > atomic_set(&p->address, address); > // tell the thread to start work > qemu_sem_post(&p->sem); > > Above didn't really use any lock at all (either the per thread lock, > or the global lock). Would it work? Probably (surely on x86). But on the "final code", it becomes: qemu_mutex_lock(&multifd_send_state->mutex); for (i = 0; i < multifd_send_state->count; i++) { p = &multifd_send_state->params[i]; if (p->done) { p->done = false; break; } } qemu_mutex_unlock(&multifd_send_state->mutex); qemu_mutex_lock(&p->mutex); p->pages.num = pages->num; iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0, iov_size(pages->iov, pages->num)); pages->num = 0; qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); So, we set done to false, without the global mutex (yes, we can change that for one atomic). But then we copy an iov without a lock? With the other thread checking for pages->num == 0? It sounds a bit fragile to me, no? Later, Juan.