Re: [Qemu-block] util/aio-posix: Use RCU for handler insertion.

2018-12-06 Thread Remy NOEL
I did some tests and noticed the second and third patch to incur some 
performance loss (on a scenario using virtio device)


I will therefore resubmit just the first patch alone.

On 11/16/18 8:02 PM, remy.n...@blade-group.com wrote:

From: Remy Noel 

get rid of the delete attribute.

We still need to get rid of the context list lock.

Signed-off-by: Remy Noel 
---
  util/aio-posix.c | 75 ++--
  util/aio-win32.c | 43 ++-
  2 files changed, 49 insertions(+), 69 deletions(-)

diff --git a/util/aio-posix.c b/util/aio-posix.c
index b34d97292a..83db3f65f4 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -16,6 +16,7 @@
  #include "qemu/osdep.h"
  #include "qemu-common.h"
  #include "block/block.h"
+#include "qemu/rcu.h"
  #include "qemu/rcu_queue.h"
  #include "qemu/sockets.h"
  #include "qemu/cutils.h"
@@ -26,13 +27,14 @@
  
  struct AioHandler

  {
+struct rcu_head rcu;
+
  GPollFD pfd;
  IOHandler *io_read;
  IOHandler *io_write;
  AioPollFn *io_poll;
  IOHandler *io_poll_begin;
  IOHandler *io_poll_end;
-int deleted;
  void *opaque;
  bool is_external;
  QLIST_ENTRY(AioHandler) node;
@@ -65,19 +67,25 @@ static bool aio_epoll_try_enable(AioContext *ctx)
  {
  AioHandler *node;
  struct epoll_event event;
+int r = 0;
+
  
+rcu_read_lock();

  QLIST_FOREACH_RCU(node, >aio_handlers, node) {
-int r;
-if (node->deleted || !node->pfd.events) {
+if (!node->pfd.events) {
  continue;
  }
  event.events = epoll_events_from_pfd(node->pfd.events);
  event.data.ptr = node;
  r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, );
  if (r) {
-return false;
+break;
  }
  }
+rcu_read_unlock();
+if (r) {
+return false;
+}
  ctx->epoll_enabled = true;
  return true;
  }
@@ -193,14 +201,13 @@ static AioHandler *find_aio_handler(AioContext *ctx, int 
fd)
  
  QLIST_FOREACH(node, >aio_handlers, node) {

  if (node->pfd.fd == fd)
-if (!node->deleted)
-return node;
+return node;
  }
  
  return NULL;

  }
  
-static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)

+static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
  {
  /* If the GSource is in the process of being destroyed then
   * g_source_remove_poll() causes an assertion failure.  Skip
@@ -210,19 +217,7 @@ static bool aio_remove_fd_handler(AioContext *ctx, 
AioHandler *node)
  if (!g_source_is_destroyed(>source)) {
  g_source_remove_poll(>source, >pfd);
  }
-
-/* If a read is in progress, just mark the node as deleted */
-if (qemu_lockcnt_count(>list_lock)) {
-node->deleted = 1;
-node->pfd.revents = 0;
-return false;
-}
-/* Otherwise, delete it for real.  We can't just mark it as
- * deleted because deleted nodes are only cleaned up while
- * no one is walking the handlers list.
- */
-QLIST_REMOVE(node, node);
-return true;
+QLIST_REMOVE_RCU(node, node);
  }
  
  void aio_set_fd_handler(AioContext *ctx,

@@ -249,7 +244,8 @@ void aio_set_fd_handler(AioContext *ctx,
  qemu_lockcnt_unlock(>list_lock);
  return;
  }
-deleted = aio_remove_fd_handler(ctx, node);
+aio_remove_fd_handler(ctx, node);
+deleted = true;
  poll_disable_change = -!node->io_poll;
  } else {
  poll_disable_change = !io_poll - (node && !node->io_poll);
@@ -269,7 +265,8 @@ void aio_set_fd_handler(AioContext *ctx,
  if (is_new) {
  new_node->pfd.fd = fd;
  } else {
-deleted = aio_remove_fd_handler(ctx, node);
+aio_remove_fd_handler(ctx, node);
+deleted = true;
  new_node->pfd = node->pfd;
  }
  g_source_add_poll(>source, _node->pfd);
@@ -296,7 +293,7 @@ void aio_set_fd_handler(AioContext *ctx,
  aio_notify(ctx);
  
  if (deleted) {

-g_free(node);
+g_free_rcu(node, rcu);
  }
  }
  
@@ -345,13 +342,10 @@ static void poll_set_started(AioContext *ctx, bool started)

  ctx->poll_started = started;
  
  qemu_lockcnt_inc(>list_lock);

+rcu_read_lock();
  QLIST_FOREACH_RCU(node, >aio_handlers, node) {
  IOHandler *fn;
  
-if (node->deleted) {

-continue;
-}
-
  if (started) {
  fn = node->io_poll_begin;
  } else {
@@ -362,6 +356,7 @@ static void poll_set_started(AioContext *ctx, bool started)
  fn(node->opaque);
  }
  }
+rcu_read_unlock();
  qemu_lockcnt_dec(>list_lock);
  }
  
@@ -385,6 +380,7 @@ bool aio_pending(AioContext *ctx)

   */
  qemu_lockcnt_inc(>list_lock);
  
+rcu_read_lock();

  QLIST_FOREACH_RCU(node, >aio_handlers, node) {
  

[Qemu-block] util/aio-posix: Use RCU for handler insertion.

2018-11-16 Thread remy . noel
From: Remy Noel 

get rid of the delete attribute.

We still need to get rid of the context list lock.

Signed-off-by: Remy Noel 
---
 util/aio-posix.c | 75 ++--
 util/aio-win32.c | 43 ++-
 2 files changed, 49 insertions(+), 69 deletions(-)

diff --git a/util/aio-posix.c b/util/aio-posix.c
index b34d97292a..83db3f65f4 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -16,6 +16,7 @@
 #include "qemu/osdep.h"
 #include "qemu-common.h"
 #include "block/block.h"
+#include "qemu/rcu.h"
 #include "qemu/rcu_queue.h"
 #include "qemu/sockets.h"
 #include "qemu/cutils.h"
@@ -26,13 +27,14 @@
 
 struct AioHandler
 {
+struct rcu_head rcu;
+
 GPollFD pfd;
 IOHandler *io_read;
 IOHandler *io_write;
 AioPollFn *io_poll;
 IOHandler *io_poll_begin;
 IOHandler *io_poll_end;
-int deleted;
 void *opaque;
 bool is_external;
 QLIST_ENTRY(AioHandler) node;
@@ -65,19 +67,25 @@ static bool aio_epoll_try_enable(AioContext *ctx)
 {
 AioHandler *node;
 struct epoll_event event;
+int r = 0;
+
 
+rcu_read_lock();
 QLIST_FOREACH_RCU(node, >aio_handlers, node) {
-int r;
-if (node->deleted || !node->pfd.events) {
+if (!node->pfd.events) {
 continue;
 }
 event.events = epoll_events_from_pfd(node->pfd.events);
 event.data.ptr = node;
 r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, );
 if (r) {
-return false;
+break;
 }
 }
+rcu_read_unlock();
+if (r) {
+return false;
+}
 ctx->epoll_enabled = true;
 return true;
 }
@@ -193,14 +201,13 @@ static AioHandler *find_aio_handler(AioContext *ctx, int 
fd)
 
 QLIST_FOREACH(node, >aio_handlers, node) {
 if (node->pfd.fd == fd)
-if (!node->deleted)
-return node;
+return node;
 }
 
 return NULL;
 }
 
-static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
+static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
 {
 /* If the GSource is in the process of being destroyed then
  * g_source_remove_poll() causes an assertion failure.  Skip
@@ -210,19 +217,7 @@ static bool aio_remove_fd_handler(AioContext *ctx, 
AioHandler *node)
 if (!g_source_is_destroyed(>source)) {
 g_source_remove_poll(>source, >pfd);
 }
-
-/* If a read is in progress, just mark the node as deleted */
-if (qemu_lockcnt_count(>list_lock)) {
-node->deleted = 1;
-node->pfd.revents = 0;
-return false;
-}
-/* Otherwise, delete it for real.  We can't just mark it as
- * deleted because deleted nodes are only cleaned up while
- * no one is walking the handlers list.
- */
-QLIST_REMOVE(node, node);
-return true;
+QLIST_REMOVE_RCU(node, node);
 }
 
 void aio_set_fd_handler(AioContext *ctx,
@@ -249,7 +244,8 @@ void aio_set_fd_handler(AioContext *ctx,
 qemu_lockcnt_unlock(>list_lock);
 return;
 }
-deleted = aio_remove_fd_handler(ctx, node);
+aio_remove_fd_handler(ctx, node);
+deleted = true;
 poll_disable_change = -!node->io_poll;
 } else {
 poll_disable_change = !io_poll - (node && !node->io_poll);
@@ -269,7 +265,8 @@ void aio_set_fd_handler(AioContext *ctx,
 if (is_new) {
 new_node->pfd.fd = fd;
 } else {
-deleted = aio_remove_fd_handler(ctx, node);
+aio_remove_fd_handler(ctx, node);
+deleted = true;
 new_node->pfd = node->pfd;
 }
 g_source_add_poll(>source, _node->pfd);
@@ -296,7 +293,7 @@ void aio_set_fd_handler(AioContext *ctx,
 aio_notify(ctx);
 
 if (deleted) {
-g_free(node);
+g_free_rcu(node, rcu);
 }
 }
 
@@ -345,13 +342,10 @@ static void poll_set_started(AioContext *ctx, bool 
started)
 ctx->poll_started = started;
 
 qemu_lockcnt_inc(>list_lock);
+rcu_read_lock();
 QLIST_FOREACH_RCU(node, >aio_handlers, node) {
 IOHandler *fn;
 
-if (node->deleted) {
-continue;
-}
-
 if (started) {
 fn = node->io_poll_begin;
 } else {
@@ -362,6 +356,7 @@ static void poll_set_started(AioContext *ctx, bool started)
 fn(node->opaque);
 }
 }
+rcu_read_unlock();
 qemu_lockcnt_dec(>list_lock);
 }
 
@@ -385,6 +380,7 @@ bool aio_pending(AioContext *ctx)
  */
 qemu_lockcnt_inc(>list_lock);
 
+rcu_read_lock();
 QLIST_FOREACH_RCU(node, >aio_handlers, node) {
 int revents;
 
@@ -400,6 +396,7 @@ bool aio_pending(AioContext *ctx)
 break;
 }
 }
+rcu_read_unlock();
 qemu_lockcnt_dec(>list_lock);
 
 return result;
@@ -410,14 +407,14 @@ static bool aio_dispatch_handlers(AioContext *ctx)
 AioHandler *node, *tmp;
 bool progress = false;
 
+