On Mon, Jun 28, 2010 at 05:47:55PM -0700, Steven Dake wrote:
> This is take 1 of this patch.  Needs bsd port.  Will work on that
> soon.  Wanted to get some feedback on the code.
> 
> This fixes a problem where flow control and dispatch are intermixed
> into the same stream of data, resulting in a lockup of the ipc
> system under really heavy load.  Can't really duplicate in corosync
> today unless running with some experimental patches which make
> cpg_mcast_joined async while triggering flow control on and off
> states.

Please put all this (os specific) sem code somewhere common. It's
getting messy.
  #if _POSIX_THREAD_PROCESS_SHARED > 0
  #else
  #endif

When reading the code it is not obvious what each
sem is for (sem 1 2 3 4? - how about a real name).

  cs_sem_wait(&request_msg_available);

It is becomming more and more difficult to figure out what is going
on this this file - not good for bug fixing.

The flow control diagnostics were useful, can we have
them back? Both the log mesages and the objdb stats.

-Angus

> 
> Regards
> -steve

> Index: test/cpgbench.c
> ===================================================================
> --- test/cpgbench.c   (revision 2971)
> +++ test/cpgbench.c   (working copy)
> @@ -50,6 +50,7 @@
>  #include <sys/socket.h>
>  #include <netinet/in.h>
>  #include <arpa/inet.h>
> +#include <pthread.h>
>  
>  #include <corosync/corotypes.h>
>  #include <corosync/cpg.h>
> @@ -79,6 +80,7 @@
>  
>  static unsigned int write_count;
>  
> +static int my_write = 0;
>  static void cpg_bm_deliver_fn (
>          cpg_handle_t handle,
>          const struct cpg_name *group_name,
> @@ -97,6 +99,16 @@
>  
>  static char data[500000];
>  
> +static void *dispatch_thread (void *param)
> +{
> +     int res;
> +     cpg_handle_t handle = *(cpg_handle_t *)param;
> +
> +     res = cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
> +printf ("thread exiting\n");
> +     return (0);
> +}
> +
>  static void cpg_benchmark (
>       cpg_handle_t handle,
>       int write_size)
> @@ -104,7 +116,6 @@
>       struct timeval tv1, tv2, tv_elapsed;
>       struct iovec iov;
>       unsigned int res;
> -     cpg_flow_control_state_t flow_control_state;
>  
>       alarm_notice = 0;
>       iov.iov_base = data;
> @@ -118,19 +129,11 @@
>               /*
>                * Test checkpoint write
>                */
> -             cpg_flow_control_state_get (handle, &flow_control_state);
> -             if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
>  retry:
> -                     res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 
> 1);
> -                     if (res == CS_ERR_TRY_AGAIN) {
> -                             goto retry;
> -                     }
> +             res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
> +             if (res == CS_ERR_TRY_AGAIN) {
> +                     goto retry;
>               }
> -             res = cpg_dispatch (handle, CS_DISPATCH_ALL);
> -             if (res != CS_OK) {
> -                     printf ("cpg dispatch returned error %d\n", res);
> -                     exit (1);
> -             }
>       } while (alarm_notice == 0);
>       gettimeofday (&tv2, NULL);
>       timersub (&tv2, &tv1, &tv_elapsed);
> @@ -160,14 +163,17 @@
>       unsigned int size;
>       int i;
>       unsigned int res;
> +     pthread_t thread;
>  
> -     size = 1000;
> +
> +     size = 1;
>       signal (SIGALRM, sigalrm_handler);
>       res = cpg_initialize (&handle, &callbacks);
>       if (res != CS_OK) {
>               printf ("cpg_initialize failed with result %d\n", res);
>               exit (1);
>       }
> +     pthread_create (&thread, NULL, dispatch_thread, &handle);
>  
>       res = cpg_join (handle, &group_name);
>       if (res != CS_OK) {
> @@ -183,7 +189,7 @@
>  
>       res = cpg_finalize (handle);
>       if (res != CS_OK) {
> -             printf ("cpg_join failed with result %d\n", res);
> +             printf ("cpg_finalize failed with result %d\n", res);
>               exit (1);
>       }
>       return (0);
> Index: include/corosync/coroipc_ipc.h
> ===================================================================
> --- include/corosync/coroipc_ipc.h    (revision 2971)
> +++ include/corosync/coroipc_ipc.h    (working copy)
> @@ -74,7 +74,9 @@
>       sem_t sem0;
>       sem_t sem1;
>       sem_t sem2;
> +     sem_t sem3;
>  #endif
> +     int flow_control_enabled;
>  };
>  
>  enum res_init_types {
> Index: exec/coroipcs.c
> ===================================================================
> --- exec/coroipcs.c   (revision 2971)
> +++ exec/coroipcs.c   (working copy)
> @@ -94,6 +94,9 @@
>  #define MSG_SEND_LOCKED              0
>  #define MSG_SEND_UNLOCKED    1
>  
> +#define POLL_STATE_IN                1
> +#define POLL_STATE_INOUT     2
> +
>  static struct coroipcs_init_state_v2 *api = NULL;
>  
>  DECLARE_LIST_INIT (conn_info_list_head);
> @@ -140,7 +143,6 @@
>       pthread_attr_t thread_attr;
>       unsigned int service;
>       enum conn_state state;
> -     int notify_flow_control_enabled;
>       int flow_control_state;
>       int refcount;
>       hdb_handle_t stats_handle;
> @@ -165,6 +167,7 @@
>       unsigned int setup_bytes_read;
>       struct list_head zcb_mapped_list_head;
>       char *sending_allowed_private_data[64];
> +     int poll_state;
>  };
>  
>  static int shared_mem_dispatch_bytes_left (const struct conn_info 
> *conn_info);
> @@ -659,6 +662,8 @@
>  #endif
>  
>       for (;;) {
> +             int sem_value;
> +
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
>  retry_semwait:
>               res = sem_wait (&conn_info->control_buffer->sem0);
> @@ -670,6 +675,13 @@
>                       api->stats_increment_value (conn_info->stats_handle, 
> "sem_retry_count");
>                       goto retry_semwait;
>               }
> +             outq_flush (conn_info);
> +             sem_getvalue (&conn_info->control_buffer->sem3, &sem_value);
> +             if (sem_value > 0) {
> +                     sem_wait (&conn_info->control_buffer->sem3);
> +             } else {
> +                     continue;
> +             }
>  #else
>  
>               sop.sem_num = 0;
> @@ -934,6 +946,7 @@
>       conn_info->client_pid = 0;
>       conn_info->service = SOCKET_SERVICE_INIT;
>       conn_info->state = CONN_STATE_THREAD_INACTIVE;
> +        conn_info->poll_state = POLL_STATE_IN;
>       list_init (&conn_info->outq_head);
>       list_init (&conn_info->list);
>       list_init (&conn_info->zcb_mapped_list_head);
> @@ -1272,36 +1285,6 @@
>       conn_info->control_buffer->write = (write_idx + len) % 
> conn_info->dispatch_size;
>  }
>  
> -/**
> - * simulate the behaviour in coroipcc.c
> - */
> -static int flow_control_event_send (struct conn_info *conn_info, char event)
> -{
> -     int new_fc = 0;
> -
> -     if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
> -             event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> -             new_fc = 1;
> -     }
> -
> -     if (conn_info->flow_control_state != new_fc) {
> -             if (new_fc == 1) {
> -                     log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control 
> for %d, event %d\n",
> -                             conn_info->client_pid, event);
> -             } else {
> -                     log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control 
> for %d, event %d\n",
> -                             conn_info->client_pid, event);
> -             }
> -             conn_info->flow_control_state = new_fc;
> -             api->stats_update_value (conn_info->stats_handle, 
> "flow_control",
> -                     &conn_info->flow_control_state,
> -                     sizeof(conn_info->flow_control_state));
> -             api->stats_increment_value (conn_info->stats_handle, 
> "flow_control_count");
> -     }
> -
> -     return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
> -}
> -
>  static void msg_send (void *conn, const struct iovec *iov, unsigned int 
> iov_len,
>                     int locked)
>  {
> @@ -1311,30 +1294,23 @@
>  #endif
>       int res;
>       int i;
> +     char buf;
>  
>       for (i = 0; i < iov_len; i++) {
>               memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
>       }
>  
> -     if (list_empty (&conn_info->outq_head))
> -             res = flow_control_event_send (conn_info, 
> MESSAGE_RES_OUTQ_EMPTY);
> -     else
> -             res = flow_control_event_send (conn_info, 
> MESSAGE_RES_OUTQ_NOT_EMPTY);
> -
> -     if (res == -1 && errno == EAGAIN) {
> -             if (locked == 0) {
> -                     pthread_mutex_lock (&conn_info->mutex);
> -             }
> +     buf = list_empty (&conn_info->outq_head);
> +     res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> +     if (res != 1) {
>               conn_info->pending_semops += 1;
> -             if (locked == 0) {
> -                     pthread_mutex_unlock (&conn_info->mutex);
> +             if (conn_info->poll_state == POLL_STATE_IN) {
> +                     conn_info->poll_state = POLL_STATE_INOUT;
> +                     api->poll_dispatch_modify (conn_info->fd,
> +                             POLLIN|POLLOUT|POLLNVAL);
>               }
> -             api->poll_dispatch_modify (conn_info->fd,
> -                     POLLIN|POLLOUT|POLLNVAL);
> -     } else
> -     if (res == -1) {
> -             ipc_disconnect (conn_info);
>       }
> +
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
>       res = sem_post (&conn_info->control_buffer->sem2);
>  #else
> @@ -1364,8 +1340,8 @@
>  
>       pthread_mutex_lock (&conn_info->mutex);
>       if (list_empty (&conn_info->outq_head)) {
> -             res = flow_control_event_send (conn_info, 
> MESSAGE_RES_OUTQ_FLUSH_NR);
>               pthread_mutex_unlock (&conn_info->mutex);
> +             conn_info->control_buffer->flow_control_enabled = 0;
>               return;
>       }
>       for (list = conn_info->outq_head.next;
> @@ -1375,6 +1351,7 @@
>               outq_item = list_entry (list, struct outq_item, list);
>               bytes_left = shared_mem_dispatch_bytes_left (conn_info);
>               if (bytes_left > outq_item->mlen) {
> +printf ("flushing a message\n");
>                       iov.iov_base = outq_item->msg;
>                       iov.iov_len = outq_item->mlen;
>                       msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
> @@ -1460,6 +1437,7 @@
>               bytes_msg += iov[i].iov_len;
>       }
>       if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
> +             conn_info->control_buffer->flow_control_enabled = 1;
>               outq_item = api->malloc (sizeof (struct outq_item));
>               if (outq_item == NULL) {
>                       ipc_disconnect (conn);
> @@ -1480,11 +1458,6 @@
>               outq_item->mlen = bytes_msg;
>               list_init (&outq_item->list);
>               pthread_mutex_lock (&conn_info->mutex);
> -             if (list_empty (&conn_info->outq_head)) {
> -                     conn_info->notify_flow_control_enabled = 1;
> -                     api->poll_dispatch_modify (conn_info->fd,
> -                             POLLIN|POLLOUT|POLLNVAL);
> -             }
>               list_add_tail (&outq_item->list, &conn_info->outq_head);
>               pthread_mutex_unlock (&conn_info->mutex);
>               api->stats_increment_value (conn_info->stats_handle, 
> "queue_size");
> @@ -1731,7 +1704,6 @@
>  
>               conn_info->service = req_setup->service;
>               conn_info->refcount = 0;
> -             conn_info->notify_flow_control_enabled = 0;
>               conn_info->setup_bytes_read = 0;
>  
>  #if _POSIX_THREAD_PROCESS_SHARED < 1
> @@ -1783,9 +1755,6 @@
>               res = recv (fd, &buf, 1, MSG_NOSIGNAL);
>               if (res == 1) {
>                       switch (buf) {
> -                     case MESSAGE_REQ_OUTQ_FLUSH:
> -                             outq_flush (conn_info);
> -                             break;
>                       case MESSAGE_REQ_CHANGE_EUID:
>                               if (priv_change (conn_info) == -1) {
>                                       ipc_disconnect (conn_info);
> @@ -1809,37 +1778,24 @@
>               coroipcs_refcount_dec (conn_info);
>       }
>  
> -     coroipcs_refcount_inc (conn_info);
> -     pthread_mutex_lock (&conn_info->mutex);
> -     if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & 
> POLLOUT)) {
> -             if (list_empty (&conn_info->outq_head))
> -                     buf = MESSAGE_RES_OUTQ_EMPTY;
> -             else
> -                     buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
> +     if (revent & POLLOUT) {
> +             int psop = conn_info->pending_semops;
> +             int i;
>  
> -             for (; conn_info->pending_semops;) {
> -                     res = flow_control_event_send (conn_info, buf);
> -                     if (res == 1) {
> -                             conn_info->pending_semops--;
> +             assert (psop != 0);
> +             for (i = 0; i < psop; i++) {
> +                     res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
> +                     if (res != 1) {
> +                             return (0);
>                       } else {
> -                             break;
> +                             conn_info->pending_semops -= 1;
>                       }
>               }
> -             if (conn_info->notify_flow_control_enabled) {
> -                     res = flow_control_event_send (conn_info, 
> MESSAGE_RES_ENABLE_FLOWCONTROL);
> -                     if (res == 1) {
> -                             conn_info->notify_flow_control_enabled = 0;
> -                     }
> +             if (conn_info->poll_state == POLL_STATE_INOUT) {
> +                     conn_info->poll_state = POLL_STATE_IN;
> +                     api->poll_dispatch_modify (conn_info->fd, 
> POLLIN|POLLNVAL);
>               }
> -             if (conn_info->notify_flow_control_enabled == 0 &&
> -                     conn_info->pending_semops == 0) {
> -
> -                     api->poll_dispatch_modify (conn_info->fd,
> -                             POLLIN|POLLNVAL);
> -             }
>       }
> -     pthread_mutex_unlock (&conn_info->mutex);
> -     coroipcs_refcount_dec (conn_info);
>  
>       return (0);
>  }
> Index: lib/coroipcc.c
> ===================================================================
> --- lib/coroipcc.c    (revision 2971)
> +++ lib/coroipcc.c    (working copy)
> @@ -116,6 +116,23 @@
>  #define MSG_NOSIGNAL 0
>  #endif
>  
> +static int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
> +{
> +     unsigned int n_read;
> +     unsigned int n_write;
> +     unsigned int bytes_left;
> +
> +     n_read = context->control_buffer->read;
> +     n_write = context->control_buffer->write;
> +
> +     if (n_read <= n_write) {
> +             bytes_left = context->dispatch_size - n_write + n_read;
> +     } else {
> +             bytes_left = n_read - n_write;
> +     }
> +     return (bytes_left);
> +}
> +
>  static cs_error_t
>  socket_send (
>       int s,
> @@ -440,6 +457,10 @@
>       }
>  
>  #if _POSIX_THREAD_PROCESS_SHARED > 0
> +     res = sem_post (&ipc_instance->control_buffer->sem3);
> +     if (res == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
>       res = sem_post (&ipc_instance->control_buffer->sem0);
>       if (res == -1) {
>               return (CS_ERR_LIBRARY);
> @@ -449,18 +470,35 @@
>        * Signal semaphore #0 indicting a new message from client
>        * to server request queue
>        */
> +     sop.sem_num = 4;
> +     sop.sem_op = 1;
> +     sop.sem_flg = 0;
> +
> +retry_semop_four:
> +     res = semop (ipc_instance->semid, &sop, 1);
> +     if (res == -1 && errno == EINTR) {
> +             return (CS_ERR_TRY_AGAIN);
> +     } else
> +     if (res == -1 && errno == EACCES) {
> +             priv_change_send (ipc_instance);
> +             goto retry_semop_four;
> +     } else
> +     if (res == -1) {
> +             return (CS_ERR_LIBRARY);
> +     }
> +
>       sop.sem_num = 0;
>       sop.sem_op = 1;
>       sop.sem_flg = 0;
>  
> -retry_semop:
> +retry_semop_zero:
>       res = semop (ipc_instance->semid, &sop, 1);
>       if (res == -1 && errno == EINTR) {
>               return (CS_ERR_TRY_AGAIN);
>       } else
>       if (res == -1 && errno == EACCES) {
>               priv_change_send (ipc_instance);
> -             goto retry_semop;
> +             goto retry_semop_zero;
>       } else
>       if (res == -1) {
>               return (CS_ERR_LIBRARY);
> @@ -706,6 +744,7 @@
>       sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
>       sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
>       sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
> +     sem_init (&ipc_instance->control_buffer->sem3, 1, 0);
>  #else
>       /*
>        * Allocate a semaphore segment
> @@ -714,7 +753,7 @@
>               semkey = random();
>               ipc_instance->euid = geteuid ();
>               if ((ipc_instance->semid
> -                  = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
> +                  = semget (semkey, 5, IPC_CREAT|IPC_EXCL|0600)) != -1) {
>                     break;
>               }
>               /*
> @@ -730,18 +769,14 @@
>               }
>       }
>  
> -     semun.val = 0;
> -     sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
> -     if (sys_res != 0) {
> -             res = CS_ERR_LIBRARY;
> -             goto error_exit;
> +     for (i = 0; i < 5; i++) {
> +             semun.val = 0;
> +             sys_res = semctl (ipc_instance->semid, i, SETVAL, semun);
> +             if (sys_res != 0) {
> +                     res = CS_ERR_LIBRARY;
> +                     goto error_exit;
> +             }
>       }
> -
> -     sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
> -     if (sys_res != 0) {
> -             res = CS_ERR_LIBRARY;
> -             goto error_exit;
> -     }
>  #endif
>  
>       /*
> @@ -877,8 +912,6 @@
>       int poll_events;
>       char buf;
>       struct ipc_instance *ipc_instance;
> -     int res;
> -     char buf_two = 1;
>       char *data_addr;
>       cs_error_t error = CS_OK;
>  
> @@ -911,47 +944,15 @@
>               goto error_put;
>       }
>  
> -     res = recv (ipc_instance->fd, &buf, 1, 0);
> -     if (res == -1 && errno == EINTR) {
> -             error = CS_ERR_TRY_AGAIN;
> -             goto error_put;
> -     } else
> -     if (res == -1) {
> -             error = CS_ERR_LIBRARY;
> -             goto error_put;
> -     } else
> -     if (res == 0) {
> -             /* Means that the peer closed cleanly the socket. However, it 
> should
> -              * happen only on BSD and Darwing systems since poll() returns a
> -              * POLLHUP event on other systems.
> +     error = socket_recv (ipc_instance->fd, &buf, 1);
> +     assert (error == CS_OK);
> +
> +     if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
> +             /*
> +              * Notify coroipcs to flush any pending dispatch messages
>                */
> -             error = CS_ERR_LIBRARY;
> -             goto error_put;
> +             sem_post (&ipc_instance->control_buffer->sem0);
>       }
> -     ipc_instance->flow_control_state = 0;
> -     if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == 
> MESSAGE_RES_ENABLE_FLOWCONTROL) {
> -             ipc_instance->flow_control_state = 1;
> -     }
> -     /*
> -      * Notify executive to flush any pending dispatch messages
> -      */
> -     if (ipc_instance->flow_control_state) {
> -             buf_two = MESSAGE_REQ_OUTQ_FLUSH;
> -             res = socket_send (ipc_instance->fd, &buf_two, 1);
> -             assert (res == CS_OK); /* TODO */
> -     }
> -     /*
> -      * This is just a notification of flow control starting at the addition
> -      * of a new pending message, not a message to dispatch
> -      */
> -     if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
> -             error = CS_ERR_TRY_AGAIN;
> -             goto error_put;
> -     }
> -     if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
> -             error = CS_ERR_TRY_AGAIN;
> -             goto error_put;
> -     }
>  
>       data_addr = ipc_instance->dispatch_buffer;
>  
> @@ -1017,6 +1018,11 @@
>               return (res);
>       }
>  
> +     if (ipc_instance->control_buffer->flow_control_enabled == 1) {
> +             res = CS_ERR_TRY_AGAIN;
> +             goto put_exit;
> +     }
> +
>       pthread_mutex_lock (&ipc_instance->mutex);
>  
>       res = msg_send (ipc_instance, iov, iov_len);
> @@ -1027,8 +1033,9 @@
>       res = reply_receive (ipc_instance, res_msg, res_len);
>  
>  error_exit:
> +     pthread_mutex_unlock (&ipc_instance->mutex);
> +put_exit:
>       hdb_handle_put (&ipc_hdb, handle);
> -     pthread_mutex_unlock (&ipc_instance->mutex);
>  
>       return (res);
>  }

> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to