Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads
On 18/01/2017 16:24, Stefan Hajnoczi wrote: > On Fri, Jan 13, 2017 at 02:17:21PM +0100, Paolo Bonzini wrote: >> @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs) >> client->ioc = NULL; >> } >> >> -static void nbd_reply_ready(void *opaque) >> +static void nbd_read_reply_entry(void *opaque) > > Please use coroutine_fn. Yes, you requested that in the RFC as well. Paolo
Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads
On Fri, Jan 13, 2017 at 02:17:21PM +0100, Paolo Bonzini wrote: > @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs) > client->ioc = NULL; > } > > -static void nbd_reply_ready(void *opaque) > +static void nbd_read_reply_entry(void *opaque) Please use coroutine_fn. signature.asc Description: PGP signature
Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads
On 16/01/2017 13:52, Fam Zheng wrote: > +/* Kick the read_reply_co to get the next reply. */ > +aio_co_wake(s->read_reply_co); > > Can't s->read_reply_co be NULL? nbd_read_reply_entry unsets it. (Surprisingly > this file is rather unfamiliar to me, it's possible I'm missing something.) Yes, that can happen depending on how the coroutines are scheduled when the server goes down. Paolo
Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads
On Fri, 01/13 14:17, Paolo Bonzini wrote: > Read the replies from a coroutine, switching the read side between the > "read header" coroutine and the I/O coroutine that reads the body of > the reply. > > qio_channel_yield is used so that the right coroutine is restarted > automatically, eliminating the need for send_coroutine in > NBDClientSession. > > Signed-off-by: Paolo Bonzini > --- > block/nbd-client.c | 108 > + > block/nbd-client.h | 2 +- > nbd/client.c | 2 +- > nbd/common.c | 9 + > 4 files changed, 45 insertions(+), 76 deletions(-) > > @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs) > client->ioc = NULL; > } > > -static void nbd_reply_ready(void *opaque) > +static void nbd_read_reply_entry(void *opaque) > { > -BlockDriverState *bs = opaque; > -NBDClientSession *s = nbd_get_client_session(bs); > +NBDClientSession *s = opaque; > uint64_t i; > int ret; > > -if (!s->ioc) { /* Already closed */ > -return; > -} > - > -if (s->reply.handle == 0) { > -/* No reply already in flight. Fetch a header. It is possible > - * that another thread has done the same thing in parallel, so > - * the socket is not readable anymore. > - */ > +for (;;) { > +assert(s->reply.handle == 0); > ret = nbd_receive_reply(s->ioc, &s->reply); > -if (ret == -EAGAIN) { > -return; > -} > if (ret < 0) { > -s->reply.handle = 0; > -goto fail; > +break; > } > -} > - > -/* There's no need for a mutex on the receive side, because the > - * handler acts as a synchronization point and ensures that only > - * one coroutine is called until the reply finishes. */ > -i = HANDLE_TO_INDEX(s, s->reply.handle); > -if (i >= MAX_NBD_REQUESTS) { > -goto fail; > -} > > -if (s->recv_coroutine[i]) { > -qemu_coroutine_enter(s->recv_coroutine[i]); > -return; > -} > - > -fail: > -nbd_teardown_connection(bs); > -} > +/* There's no need for a mutex on the receive side, because the > + * handler acts as a synchronization point and ensures that only > + * one coroutine is called until the reply finishes. > + */ > +i = HANDLE_TO_INDEX(s, s->reply.handle); > +if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) { > +break; > +} > > -static void nbd_restart_write(void *opaque) > -{ > -BlockDriverState *bs = opaque; > +aio_co_wake(s->recv_coroutine[i]); > > -qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine); > +/* We're woken up by the recv_coroutine itself. */ "Wait until we're woken by ..." ? > +qemu_coroutine_yield(); > +} > +s->read_reply_co = NULL; > } > > static int nbd_co_send_request(BlockDriverState *bs, > @@ -120,7 +102,6 @@ static int nbd_co_send_request(BlockDriverState *bs, > QEMUIOVector *qiov) > { > NBDClientSession *s = nbd_get_client_session(bs); > -AioContext *aio_context; > int rc, ret, i; > > qemu_co_mutex_lock(&s->send_mutex); > @@ -141,11 +122,6 @@ static int nbd_co_send_request(BlockDriverState *bs, > return -EPIPE; > } > > -s->send_coroutine = qemu_coroutine_self(); > -aio_context = bdrv_get_aio_context(bs); > - > -aio_set_fd_handler(aio_context, s->sioc->fd, false, > - nbd_reply_ready, nbd_restart_write, NULL, bs); > if (qiov) { > qio_channel_set_cork(s->ioc, true); > rc = nbd_send_request(s->ioc, request); > @@ -160,9 +136,6 @@ static int nbd_co_send_request(BlockDriverState *bs, > } else { > rc = nbd_send_request(s->ioc, request); > } > -aio_set_fd_handler(aio_context, s->sioc->fd, false, > - nbd_reply_ready, NULL, NULL, bs); > -s->send_coroutine = NULL; > qemu_co_mutex_unlock(&s->send_mutex); > return rc; > } > @@ -174,8 +147,7 @@ static void nbd_co_receive_reply(NBDClientSession *s, > { > int ret; > > -/* Wait until we're woken up by the read handler. TODO: perhaps > - * peek at the next reply and avoid yielding if it's ours? */ > +/* Wait until we're woken up by nbd_read_reply_entry. */ > qemu_coroutine_yield(); > *reply = s->reply; > if (reply->handle != request->handle || > @@ -209,14 +181,18 @@ static void nbd_coroutine_start(NBDClientSession *s, > /* s->recv_coroutine[i] is set as soon as we get the send_lock. */ > } > > -static void nbd_coroutine_end(NBDClientSession *s, > +static void nbd_coroutine_end(BlockDriverState *bs, >NBDRequest *request) > { > +NBDClientSession *s = nbd_get_client_session(bs); > int i = HANDLE_TO_INDEX(s, request->handle); > + > s->recv_coro
[Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads
Read the replies from a coroutine, switching the read side between the "read header" coroutine and the I/O coroutine that reads the body of the reply. qio_channel_yield is used so that the right coroutine is restarted automatically, eliminating the need for send_coroutine in NBDClientSession. Signed-off-by: Paolo Bonzini --- block/nbd-client.c | 108 + block/nbd-client.h | 2 +- nbd/client.c | 2 +- nbd/common.c | 9 + 4 files changed, 45 insertions(+), 76 deletions(-) diff --git a/block/nbd-client.c b/block/nbd-client.c index 06f1532..eacc7a5 100644 --- a/block/nbd-client.c +++ b/block/nbd-client.c @@ -33,8 +33,9 @@ #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) #define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) -static void nbd_recv_coroutines_enter_all(NBDClientSession *s) +static void nbd_recv_coroutines_enter_all(BlockDriverState *bs) { +NBDClientSession *s = nbd_get_client_session(bs); int i; for (i = 0; i < MAX_NBD_REQUESTS; i++) { @@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s) qemu_coroutine_enter(s->recv_coroutine[i]); } } +BDRV_POLL_WHILE(bs, s->read_reply_co); } static void nbd_teardown_connection(BlockDriverState *bs) @@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs) qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); -nbd_recv_coroutines_enter_all(client); +nbd_recv_coroutines_enter_all(bs); nbd_client_detach_aio_context(bs); object_unref(OBJECT(client->sioc)); @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs) client->ioc = NULL; } -static void nbd_reply_ready(void *opaque) +static void nbd_read_reply_entry(void *opaque) { -BlockDriverState *bs = opaque; -NBDClientSession *s = nbd_get_client_session(bs); +NBDClientSession *s = opaque; uint64_t i; int ret; -if (!s->ioc) { /* Already closed */ -return; -} - -if (s->reply.handle == 0) { -/* No reply already in flight. Fetch a header. It is possible - * that another thread has done the same thing in parallel, so - * the socket is not readable anymore. - */ +for (;;) { +assert(s->reply.handle == 0); ret = nbd_receive_reply(s->ioc, &s->reply); -if (ret == -EAGAIN) { -return; -} if (ret < 0) { -s->reply.handle = 0; -goto fail; +break; } -} - -/* There's no need for a mutex on the receive side, because the - * handler acts as a synchronization point and ensures that only - * one coroutine is called until the reply finishes. */ -i = HANDLE_TO_INDEX(s, s->reply.handle); -if (i >= MAX_NBD_REQUESTS) { -goto fail; -} -if (s->recv_coroutine[i]) { -qemu_coroutine_enter(s->recv_coroutine[i]); -return; -} - -fail: -nbd_teardown_connection(bs); -} +/* There's no need for a mutex on the receive side, because the + * handler acts as a synchronization point and ensures that only + * one coroutine is called until the reply finishes. + */ +i = HANDLE_TO_INDEX(s, s->reply.handle); +if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) { +break; +} -static void nbd_restart_write(void *opaque) -{ -BlockDriverState *bs = opaque; +aio_co_wake(s->recv_coroutine[i]); -qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine); +/* We're woken up by the recv_coroutine itself. */ +qemu_coroutine_yield(); +} +s->read_reply_co = NULL; } static int nbd_co_send_request(BlockDriverState *bs, @@ -120,7 +102,6 @@ static int nbd_co_send_request(BlockDriverState *bs, QEMUIOVector *qiov) { NBDClientSession *s = nbd_get_client_session(bs); -AioContext *aio_context; int rc, ret, i; qemu_co_mutex_lock(&s->send_mutex); @@ -141,11 +122,6 @@ static int nbd_co_send_request(BlockDriverState *bs, return -EPIPE; } -s->send_coroutine = qemu_coroutine_self(); -aio_context = bdrv_get_aio_context(bs); - -aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, nbd_restart_write, NULL, bs); if (qiov) { qio_channel_set_cork(s->ioc, true); rc = nbd_send_request(s->ioc, request); @@ -160,9 +136,6 @@ static int nbd_co_send_request(BlockDriverState *bs, } else { rc = nbd_send_request(s->ioc, request); } -aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, NULL, NULL, bs); -s->send_coroutine = NULL; qemu_co_mutex_unlock(&s->send_mutex); return rc; } @@ -174,8