Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads

2017-01-18 Thread Paolo Bonzini


On 18/01/2017 16:24, Stefan Hajnoczi wrote:
> On Fri, Jan 13, 2017 at 02:17:21PM +0100, Paolo Bonzini wrote:
>> @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs)
>>  client->ioc = NULL;
>>  }
>>  
>> -static void nbd_reply_ready(void *opaque)
>> +static void nbd_read_reply_entry(void *opaque)
> 
> Please use coroutine_fn.

Yes, you requested that in the RFC as well.

Paolo



Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads

2017-01-18 Thread Stefan Hajnoczi
On Fri, Jan 13, 2017 at 02:17:21PM +0100, Paolo Bonzini wrote:
> @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs)
>  client->ioc = NULL;
>  }
>  
> -static void nbd_reply_ready(void *opaque)
> +static void nbd_read_reply_entry(void *opaque)

Please use coroutine_fn.


signature.asc
Description: PGP signature


Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads

2017-01-16 Thread Paolo Bonzini


On 16/01/2017 13:52, Fam Zheng wrote:
> +/* Kick the read_reply_co to get the next reply.  */
> +aio_co_wake(s->read_reply_co);
> 
> Can't s->read_reply_co be NULL? nbd_read_reply_entry unsets it. (Surprisingly
> this file is rather unfamiliar to me, it's possible I'm missing something.)

Yes, that can happen depending on how the coroutines are scheduled when
the server goes down.

Paolo



Re: [Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads

2017-01-16 Thread Fam Zheng
On Fri, 01/13 14:17, Paolo Bonzini wrote:
> Read the replies from a coroutine, switching the read side between the
> "read header" coroutine and the I/O coroutine that reads the body of
> the reply.
> 
> qio_channel_yield is used so that the right coroutine is restarted
> automatically, eliminating the need for send_coroutine in
> NBDClientSession.
> 
> Signed-off-by: Paolo Bonzini 
> ---
>  block/nbd-client.c | 108 
> +
>  block/nbd-client.h |   2 +-
>  nbd/client.c   |   2 +-
>  nbd/common.c   |   9 +
>  4 files changed, 45 insertions(+), 76 deletions(-)
> 
> @@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs)
>  client->ioc = NULL;
>  }
>  
> -static void nbd_reply_ready(void *opaque)
> +static void nbd_read_reply_entry(void *opaque)
>  {
> -BlockDriverState *bs = opaque;
> -NBDClientSession *s = nbd_get_client_session(bs);
> +NBDClientSession *s = opaque;
>  uint64_t i;
>  int ret;
>  
> -if (!s->ioc) { /* Already closed */
> -return;
> -}
> -
> -if (s->reply.handle == 0) {
> -/* No reply already in flight.  Fetch a header.  It is possible
> - * that another thread has done the same thing in parallel, so
> - * the socket is not readable anymore.
> - */
> +for (;;) {
> +assert(s->reply.handle == 0);
>  ret = nbd_receive_reply(s->ioc, &s->reply);
> -if (ret == -EAGAIN) {
> -return;
> -}
>  if (ret < 0) {
> -s->reply.handle = 0;
> -goto fail;
> +break;
>  }
> -}
> -
> -/* There's no need for a mutex on the receive side, because the
> - * handler acts as a synchronization point and ensures that only
> - * one coroutine is called until the reply finishes.  */
> -i = HANDLE_TO_INDEX(s, s->reply.handle);
> -if (i >= MAX_NBD_REQUESTS) {
> -goto fail;
> -}
>  
> -if (s->recv_coroutine[i]) {
> -qemu_coroutine_enter(s->recv_coroutine[i]);
> -return;
> -}
> -
> -fail:
> -nbd_teardown_connection(bs);
> -}
> +/* There's no need for a mutex on the receive side, because the
> + * handler acts as a synchronization point and ensures that only
> + * one coroutine is called until the reply finishes.
> + */
> +i = HANDLE_TO_INDEX(s, s->reply.handle);
> +if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
> +break;
> +}
>  
> -static void nbd_restart_write(void *opaque)
> -{
> -BlockDriverState *bs = opaque;
> +aio_co_wake(s->recv_coroutine[i]);
>  
> -qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
> +/* We're woken up by the recv_coroutine itself.  */

"Wait until we're woken by ..." ?

> +qemu_coroutine_yield();
> +}
> +s->read_reply_co = NULL;
>  }
>  
>  static int nbd_co_send_request(BlockDriverState *bs,
> @@ -120,7 +102,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
> QEMUIOVector *qiov)
>  {
>  NBDClientSession *s = nbd_get_client_session(bs);
> -AioContext *aio_context;
>  int rc, ret, i;
>  
>  qemu_co_mutex_lock(&s->send_mutex);
> @@ -141,11 +122,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
>  return -EPIPE;
>  }
>  
> -s->send_coroutine = qemu_coroutine_self();
> -aio_context = bdrv_get_aio_context(bs);
> -
> -aio_set_fd_handler(aio_context, s->sioc->fd, false,
> -   nbd_reply_ready, nbd_restart_write, NULL, bs);
>  if (qiov) {
>  qio_channel_set_cork(s->ioc, true);
>  rc = nbd_send_request(s->ioc, request);
> @@ -160,9 +136,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
>  } else {
>  rc = nbd_send_request(s->ioc, request);
>  }
> -aio_set_fd_handler(aio_context, s->sioc->fd, false,
> -   nbd_reply_ready, NULL, NULL, bs);
> -s->send_coroutine = NULL;
>  qemu_co_mutex_unlock(&s->send_mutex);
>  return rc;
>  }
> @@ -174,8 +147,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
>  {
>  int ret;
>  
> -/* Wait until we're woken up by the read handler.  TODO: perhaps
> - * peek at the next reply and avoid yielding if it's ours?  */
> +/* Wait until we're woken up by nbd_read_reply_entry.  */
>  qemu_coroutine_yield();
>  *reply = s->reply;
>  if (reply->handle != request->handle ||
> @@ -209,14 +181,18 @@ static void nbd_coroutine_start(NBDClientSession *s,
>  /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
>  }
>  
> -static void nbd_coroutine_end(NBDClientSession *s,
> +static void nbd_coroutine_end(BlockDriverState *bs,
>NBDRequest *request)
>  {
> +NBDClientSession *s = nbd_get_client_session(bs);
>  int i = HANDLE_TO_INDEX(s, request->handle);
> +
>  s->recv_coro

[Qemu-devel] [PATCH 06/16] nbd: do not block on partial reply header reads

2017-01-13 Thread Paolo Bonzini
Read the replies from a coroutine, switching the read side between the
"read header" coroutine and the I/O coroutine that reads the body of
the reply.

qio_channel_yield is used so that the right coroutine is restarted
automatically, eliminating the need for send_coroutine in
NBDClientSession.

Signed-off-by: Paolo Bonzini 
---
 block/nbd-client.c | 108 +
 block/nbd-client.h |   2 +-
 nbd/client.c   |   2 +-
 nbd/common.c   |   9 +
 4 files changed, 45 insertions(+), 76 deletions(-)

diff --git a/block/nbd-client.c b/block/nbd-client.c
index 06f1532..eacc7a5 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -33,8 +33,9 @@
 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
 
-static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
+static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
 {
+NBDClientSession *s = nbd_get_client_session(bs);
 int i;
 
 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
@@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
 qemu_coroutine_enter(s->recv_coroutine[i]);
 }
 }
+BDRV_POLL_WHILE(bs, s->read_reply_co);
 }
 
 static void nbd_teardown_connection(BlockDriverState *bs)
@@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
 qio_channel_shutdown(client->ioc,
  QIO_CHANNEL_SHUTDOWN_BOTH,
  NULL);
-nbd_recv_coroutines_enter_all(client);
+nbd_recv_coroutines_enter_all(bs);
 
 nbd_client_detach_aio_context(bs);
 object_unref(OBJECT(client->sioc));
@@ -65,54 +67,34 @@ static void nbd_teardown_connection(BlockDriverState *bs)
 client->ioc = NULL;
 }
 
-static void nbd_reply_ready(void *opaque)
+static void nbd_read_reply_entry(void *opaque)
 {
-BlockDriverState *bs = opaque;
-NBDClientSession *s = nbd_get_client_session(bs);
+NBDClientSession *s = opaque;
 uint64_t i;
 int ret;
 
-if (!s->ioc) { /* Already closed */
-return;
-}
-
-if (s->reply.handle == 0) {
-/* No reply already in flight.  Fetch a header.  It is possible
- * that another thread has done the same thing in parallel, so
- * the socket is not readable anymore.
- */
+for (;;) {
+assert(s->reply.handle == 0);
 ret = nbd_receive_reply(s->ioc, &s->reply);
-if (ret == -EAGAIN) {
-return;
-}
 if (ret < 0) {
-s->reply.handle = 0;
-goto fail;
+break;
 }
-}
-
-/* There's no need for a mutex on the receive side, because the
- * handler acts as a synchronization point and ensures that only
- * one coroutine is called until the reply finishes.  */
-i = HANDLE_TO_INDEX(s, s->reply.handle);
-if (i >= MAX_NBD_REQUESTS) {
-goto fail;
-}
 
-if (s->recv_coroutine[i]) {
-qemu_coroutine_enter(s->recv_coroutine[i]);
-return;
-}
-
-fail:
-nbd_teardown_connection(bs);
-}
+/* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes.
+ */
+i = HANDLE_TO_INDEX(s, s->reply.handle);
+if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+break;
+}
 
-static void nbd_restart_write(void *opaque)
-{
-BlockDriverState *bs = opaque;
+aio_co_wake(s->recv_coroutine[i]);
 
-qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
+/* We're woken up by the recv_coroutine itself.  */
+qemu_coroutine_yield();
+}
+s->read_reply_co = NULL;
 }
 
 static int nbd_co_send_request(BlockDriverState *bs,
@@ -120,7 +102,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
QEMUIOVector *qiov)
 {
 NBDClientSession *s = nbd_get_client_session(bs);
-AioContext *aio_context;
 int rc, ret, i;
 
 qemu_co_mutex_lock(&s->send_mutex);
@@ -141,11 +122,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
 return -EPIPE;
 }
 
-s->send_coroutine = qemu_coroutine_self();
-aio_context = bdrv_get_aio_context(bs);
-
-aio_set_fd_handler(aio_context, s->sioc->fd, false,
-   nbd_reply_ready, nbd_restart_write, NULL, bs);
 if (qiov) {
 qio_channel_set_cork(s->ioc, true);
 rc = nbd_send_request(s->ioc, request);
@@ -160,9 +136,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
 } else {
 rc = nbd_send_request(s->ioc, request);
 }
-aio_set_fd_handler(aio_context, s->sioc->fd, false,
-   nbd_reply_ready, NULL, NULL, bs);
-s->send_coroutine = NULL;
 qemu_co_mutex_unlock(&s->send_mutex);
 return rc;
 }
@@ -174,8