Any reason not to put this patch for now?

On Tue, Jun 05, 2012 at 08:07:34AM -0400, Christoph Hellwig wrote:
> Let sd_block_handler handle the fine details of how to handle an incoming
> blocking event.  By passing the sender node structure we can easily handle
> ignoring it on other nodes, and by keeping a local operation in progress
> flag in group.c we can replace the callbacked flag in the on the wire
> events with a way simpler mechanism.
> 
> The only slightly complicated bit is that zk_notify_blocked in the zookeeper
> backend can now go negative for a short period of time, so we explicitly
> have to check for it beeing positive in two places.
> 
> Signed-off-by: Christoph Hellwig <[email protected]>
> 
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 07e5f7b..80865cf 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -11,6 +11,7 @@
>  #ifndef __CLUSTER_H__
>  #define __CLUSTER_H__
>  
> +#include <stdbool.h>
>  #include <stdio.h>
>  #include <stdlib.h>
>  #include <stdint.h>
> @@ -186,7 +187,7 @@ void sd_join_handler(struct sd_node *joined, struct 
> sd_node *members,
>  void sd_leave_handler(struct sd_node *left, struct sd_node *members,
>               size_t nr_members);
>  void sd_notify_handler(struct sd_node *sender, void *msg, size_t msg_len);
> -void sd_block_handler(void);
> +bool sd_block_handler(struct sd_node *sender);
>  enum cluster_join_result sd_check_join_cb(struct sd_node *joining,
>               void *opaque);
>  
> diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
> index abb6db1..013b978 100644
> --- a/sheep/cluster/accord.c
> +++ b/sheep/cluster/accord.c
> @@ -47,8 +47,6 @@ struct acrd_event {
>       uint64_t ids[SD_MAX_NODES];
>  
>       enum cluster_join_result join_result;
> -
> -     int callbacked; /* set non-zero after sd_block_handler() was called */
>  };
>  
>  static struct sd_node this_node;
> @@ -564,14 +562,8 @@ static void acrd_handler(int listen_fd, int events, void 
> *data)
>               sd_leave_handler(&ev.sender, ev.nodes, ev.nr_nodes);
>               break;
>       case EVENT_BLOCK:
> -             if (node_cmp(&ev.sender, &this_node) == 0 && !ev.callbacked) {
> -                     ev.callbacked = 1;
> -
> -                     acrd_queue_push_back(ahandle, &ev);
> -                     sd_block_handler();
> -             } else {
> -                     acrd_queue_push_back(ahandle, NULL);
> -             }
> +             acrd_queue_push_back(ahandle, NULL);
> +             sd_block_handler(&ev.sender);
>               break;
>       case EVENT_NOTIFY:
>               sd_notify_handler(&ev.sender, ev.buf, ev.buf_len);
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index 77c6710..d42a4cb 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -308,13 +308,9 @@ static int __corosync_dispatch_one(struct corosync_event 
> *cevent)
>               sd_leave_handler(&cevent->sender.ent, entries, nr_cpg_nodes);
>               break;
>       case COROSYNC_EVENT_TYPE_BLOCK:
> -             if (cpg_node_equal(&cevent->sender, &this_node) &&
> -                 !cevent->callbacked) {
> -                     sd_block_handler();
> -                     cevent->callbacked = 1;
> -             }
> +             sd_block_handler(&cevent->sender.ent);
>  
> -             /* block the rest messages until unblock message comes */
> +             /* block other messages until the unblock message comes */
>               return 0;
>       case COROSYNC_EVENT_TYPE_NOTIFY:
>               sd_notify_handler(&cevent->sender.ent, cevent->msg,
> diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
> index 2c3ce24..dd3936b 100644
> --- a/sheep/cluster/local.c
> +++ b/sheep/cluster/local.c
> @@ -52,8 +52,6 @@ struct local_event {
>       pid_t pids[SD_MAX_NODES];
>  
>       enum cluster_join_result join_result;
> -
> -     int callbacked; /* set non-zero after sd_block_handler() was called */
>  };
>  
>  
> @@ -405,10 +403,7 @@ static void local_handler(int listen_fd, int events, 
> void *data)
>               shm_queue_pop();
>               break;
>       case EVENT_BLOCK:
> -             if (node_eq(&ev->sender, &this_node) && !ev->callbacked) {
> -                     sd_block_handler();
> -                     ev->callbacked = 1;
> -             }
> +             sd_block_handler(&ev->sender);
>               break;
>       case EVENT_NOTIFY:
>               sd_notify_handler(&ev->sender, ev->buf, ev->buf_len);
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index 3dde1e4..6bd8e3a 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -61,8 +61,6 @@ struct zk_event {
>  
>       enum cluster_join_result join_result;
>  
> -     int callbacked; /* set non-zero after sd_block_handler() was called */
> -
>       size_t buf_len;
>       uint8_t buf[SD_MAX_EVENT_BUF_SIZE];
>  };
> @@ -243,7 +241,7 @@ static int zk_queue_pop(zhandle_t *zh, struct zk_event 
> *ev)
>       eventfd_t value = 1;
>  
>       /* process leave event */
> -     if (!uatomic_read(&zk_notify_blocked) &&
> +     if (uatomic_read(&zk_notify_blocked) <= 0 &&
>            uatomic_read(&nr_zk_levents)) {
>               nr_levents = uatomic_sub_return(&nr_zk_levents, 1) + 1;
>               dprintf("nr_zk_levents:%d, head:%u\n", nr_levents, 
> zk_levent_head);
> @@ -498,7 +496,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type 
> type,
>       ev.type = type;
>       ev.sender = *znode;
>       ev.buf_len = buf_len;
> -     ev.callbacked = 0;
>       if (buf)
>               memcpy(ev.buf, buf, buf_len);
>       zk_queue_push(zh, &ev);
> @@ -515,7 +512,6 @@ static int leave_event(zhandle_t *zh, struct zk_node 
> *znode)
>       ev->type = EVENT_LEAVE;
>       ev->sender = *znode;
>       ev->buf_len = 0;
> -     ev->callbacked = 0;
>  
>       nr_levents = uatomic_add_return(&nr_zk_levents, 1);
>       dprintf("nr_zk_levents:%d, tail:%u\n", nr_levents, zk_levent_tail);
> @@ -714,7 +710,7 @@ static void zk_handler(int listen_fd, int events, void 
> *data)
>       if (ret < 0)
>               return;
>  
> -     if (uatomic_read(&zk_notify_blocked))
> +     if (uatomic_read(&zk_notify_blocked) > 0)
>               return;
>  
>       ret = zk_queue_pop(zhandle, &ev);
> @@ -820,16 +816,9 @@ static void zk_handler(int listen_fd, int events, void 
> *data)
>               break;
>       case EVENT_BLOCK:
>               dprintf("BLOCK\n");
> -             if (node_eq(&ev.sender.node, &this_node.node)
> -                             && !ev.callbacked) {
> -                     uatomic_inc(&zk_notify_blocked);
> -                     ev.callbacked = 1;
> -                     zk_queue_push_back(zhandle, &ev);
> -                     sd_block_handler();
> -             } else {
> -                     zk_queue_push_back(zhandle, NULL);
> -             }
> -
> +             zk_queue_push_back(zhandle, NULL);
> +             if (sd_block_handler(&ev.sender.node))
> +                     uatomic_inc(&zk_notify_blocked);
>               break;
>       case EVENT_NOTIFY:
>               dprintf("NOTIFY\n");
> diff --git a/sheep/group.c b/sheep/group.c
> index c2679f2..d00a121 100644
> --- a/sheep/group.c
> +++ b/sheep/group.c
> @@ -251,6 +251,11 @@ int get_nr_copies(struct vnode_info *vnode_info)
>       return min(vnode_info->nr_zones, sys->nr_copies);
>  }
>  
> +/*
> + * Indicator if a cluster operation is currently running.
> + */
> +static bool cluster_op_running = false;
> +
>  static struct vdi_op_message *prepare_cluster_msg(struct request *req,
>               size_t *sizep)
>  {
> @@ -295,6 +300,8 @@ static void cluster_op_done(struct work *work)
>       struct vdi_op_message *msg;
>       size_t size;
>  
> +     cluster_op_running = false;
> +
>       msg = prepare_cluster_msg(req, &size);
>       if (!msg)
>               panic();
> @@ -305,20 +312,33 @@ static void cluster_op_done(struct work *work)
>  }
>  
>  /*
> - * Perform a blocked cluster operation.
> + * Perform a blocked cluster operation if we were the node requesting it
> + * and do not have any other operation pending.
>   *
> - * Must run in the main thread as it access unlocked state like
> + * If this method returns false the caller must call the method again for
> + * the same event once it gets notified again.
> + *
> + * Must run in the main thread as it accesses unlocked state like
>   * sys->pending_list.
>   */
> -void sd_block_handler(void)
> +bool sd_block_handler(struct sd_node *sender)
>  {
> -     struct request *req = list_first_entry(&sys->pending_list,
> -                                             struct request, pending_list);
> +     struct request *req;
> +
> +     if (!node_eq(sender, &sys->this_node))
> +             return false;
> +     if (cluster_op_running)
> +             return false;
> +
> +     cluster_op_running = true;
>  
> +     req = list_first_entry(&sys->pending_list,
> +                             struct request, pending_list);
>       req->work.fn = do_cluster_request;
>       req->work.done = cluster_op_done;
>  
>       queue_work(sys->block_wqueue, &req->work);
> +     return true;
>  }
>  
>  /*
> -- 
> sheepdog mailing list
> [email protected]
> http://lists.wpkg.org/mailman/listinfo/sheepdog
---end quoted text---
-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to